diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbMerge.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbMerge.h index ff35f28b76147cbdb740151bd14549ad82fd8c02..a2a0dfc7f07a439dc633e8afc2b8c317d0c1b6e3 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbMerge.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbMerge.h @@ -16,6 +16,7 @@ #include "tsdbDataFReaderWriter.h" #include "tsdbFS.h" #include "tsdbSttFReaderWriter.h" +#include "tsdbUtil.h" #ifndef _TD_TSDB_MERGE_H_ #define _TD_TSDB_MERGE_H_ diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index 8d8f06e90c59ec59affb14c6ef5f8546d1de9a43..f8b972562faed23471736e11c52fc68dca935f91 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -30,6 +30,7 @@ typedef struct { SMergeCtx ctx; // config int32_t maxRow; + int32_t minRow; int32_t szPage; int8_t cmprAlg; int64_t cid; @@ -82,6 +83,28 @@ static int32_t tsdbMergeNextRow(SMerger *merger) { return 0; } +static int32_t tsdbMergeToDataWriteTSDataBlock(SMerger *merger) { + if (merger->ctx.bData.nRow == 0) return 0; + + int32_t code = 0; + int32_t lino = 0; + int32_t vid = TD_VID(merger->tsdb->pVnode); + if (merger->ctx.bData.nRow >= merger->minRow) { + // code = tsdbDataFWriteTSDataBlock(merger->dataWriter, &merger->ctx.bData); + // TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbSttFWriteTSDataBlock(merger->sttWriter, &merger->ctx.bData); + TSDB_CHECK_CODE(code, lino, _exit); + } + + tBlockDataReset(&merger->ctx.bData); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + } + return code; +} static int32_t tsdbMergeToData(SMerger *merger) { int32_t code = 0; int32_t lino = 0; @@ -91,16 +114,30 @@ static int32_t tsdbMergeToData(SMerger *merger) { code = tsdbMergeNextRow(merger); TSDB_CHECK_CODE(code, lino, _exit); - if (!merger->ctx.pRowInfo) break; + if (!merger->ctx.pRowInfo) { + code = tsdbMergeToDataWriteTSDataBlock(merger); + TSDB_CHECK_CODE(code, lino, _exit); + break; + } + + if (!TABLE_SAME_SCHEMA(merger->ctx.bData.suid, merger->ctx.bData.suid, merger->ctx.pRowInfo->suid, + merger->ctx.pRowInfo->uid)) { + code = tsdbMergeToDataWriteTSDataBlock(merger); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbUpdateSkmTb(merger->tsdb, (TABLEID *)merger->ctx.pRowInfo, &merger->skmTb); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tBlockDataInit(&merger->ctx.bData, (TABLEID *)merger->ctx.pRowInfo, merger->skmTb.pTSchema, NULL, 0); + TSDB_CHECK_CODE(code, lino, _exit); + } code = tBlockDataAppendRow(&merger->ctx.bData, &merger->ctx.pRowInfo->row, NULL, merger->ctx.pRowInfo->uid); TSDB_CHECK_CODE(code, lino, _exit); if (merger->ctx.bData.nRow >= merger->maxRow) { - // code = tsdbDataFWriteTSDataBlock(merger->dataWriter, &merger->ctx.bData); - // TSDB_CHECK_CODE(code, lino, _exit); - - tBlockDataReset(&merger->ctx.bData); + code = tsdbMergeToDataWriteTSDataBlock(merger); + TSDB_CHECK_CODE(code, lino, _exit); } } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c index d793a257447bedb359a5e4dc28f37e9c97cad2f4..4560e76c67c4db20e6d74e3da1ed27c0682b5e04 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c @@ -673,7 +673,24 @@ _exit: } int32_t tsdbSttFWriteTSDataBlock(SSttFileWriter *pWriter, SBlockData *pBlockData) { - // TODO + int32_t code = 0; + int32_t lino = 0; + + SRowInfo rowInfo; + rowInfo.suid = pBlockData->suid; + for (int32_t i = 0; i < pBlockData->nRow; i++) { + rowInfo.uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[i]; + rowInfo.row = tsdbRowFromBlockData(pBlockData, i); + + code = tsdbSttFWriteTSData(pWriter, &rowInfo); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino, + tstrerror(code)); + } return 0; }