diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 8992ee47c42325373651976628950f2ffe240862..05e08b6bf4ce27944ccb04f77c11177e34767dce 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -17,6 +17,8 @@ typedef enum { MEMORY_DATA_ITER = 0, STT_DATA_ITER } EDataIterT; +#define USE_STREAM_COMPRESSION 1 + typedef struct { SRBTreeNode n; SRowInfo r; @@ -70,12 +72,16 @@ typedef struct { int8_t toLastOnly; }; struct { - SDataFWriter *pWriter; - SArray *aBlockIdx; // SArray - SArray *aSttBlk; // SArray - SMapData mBlock; // SMapData - SBlockData bData; + SDataFWriter *pWriter; + SArray *aBlockIdx; // SArray + SArray *aSttBlk; // SArray + SMapData mBlock; // SMapData + SBlockData bData; +#if USE_STREAM_COMPRESSION SDiskDataBuilder *pBuilder; +#else + SBlockData bDatal; +#endif } dWriter; SSkmInfo skmTable; SSkmInfo skmRow; @@ -542,7 +548,11 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { taosArrayClear(pCommitter->dWriter.aSttBlk); tMapDataReset(&pCommitter->dWriter.mBlock); tBlockDataReset(&pCommitter->dWriter.bData); +#if USE_STREAM_COMPRESSION tDiskDataBuilderClear(pCommitter->dWriter.pBuilder); +#else + tBlockDataReset(&pCommitter->dWriter.bDatal); +#endif // open iter code = tsdbOpenCommitIter(pCommitter); @@ -860,7 +870,11 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { code = tBlockDataCreate(&pCommitter->dWriter.bData); TSDB_CHECK_CODE(code, lino, _exit); +#if USE_STREAM_COMPRESSION code = tDiskDataBuilderCreate(&pCommitter->dWriter.pBuilder); +#else + code = tBlockDataCreate(&pCommitter->dWriter.bDatal); +#endif TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -889,7 +903,11 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) { taosArrayDestroy(pCommitter->dWriter.aSttBlk); tMapDataClear(&pCommitter->dWriter.mBlock); tBlockDataDestroy(&pCommitter->dWriter.bData, 1); +#if USE_STREAM_COMPRESSION tDiskDataBuilderDestroy(pCommitter->dWriter.pBuilder); +#else + tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1); +#endif tTSchemaDestroy(pCommitter->skmTable.pTSchema); tTSchemaDestroy(pCommitter->skmRow.pTSchema); } @@ -1360,6 +1378,7 @@ static int32_t tsdbInitSttBlockBuilderIfNeed(SCommitter *pCommitter, TABLEID id) int32_t code = 0; int32_t lino = 0; +#if USE_STREAM_COMPRESSION SDiskDataBuilder *pBuilder = pCommitter->dWriter.pBuilder; if (pBuilder->suid || pBuilder->uid) { if (!TABLE_SAME_SCHEMA(pBuilder->suid, pBuilder->uid, id.suid, id.uid)) { @@ -1376,6 +1395,25 @@ static int32_t tsdbInitSttBlockBuilderIfNeed(SCommitter *pCommitter, TABLEID id) code = tDiskDataBuilderInit(pBuilder, pCommitter->skmTable.pTSchema, &id, pCommitter->cmprAlg, 0); TSDB_CHECK_CODE(code, lino, _exit); } +#else + SBlockData *pBData = &pCommitter->dWriter.bDatal; + if (pBData->suid || pBData->uid) { + if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) { + code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); + + tBlockDataReset(pBData); + } + } + + if (!pBData->suid && !pBData->uid) { + ASSERT(pCommitter->skmTable.suid == id.suid); + ASSERT(pCommitter->skmTable.uid == id.uid); + TABLEID tid = {.suid = id.suid, .uid = id.suid ? 0 : id.uid}; + code = tBlockDataInit(pBData, &tid, pCommitter->skmTable.pTSchema, NULL, 0); + TSDB_CHECK_CODE(code, lino, _exit); + } +#endif _exit: if (code) { @@ -1398,6 +1436,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) { TSDBROW row = tsdbRowFromBlockData(pBData, iRow); +#if USE_STREAM_COMPRESSION code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &row, NULL, &id); TSDB_CHECK_CODE(code, lino, _exit); @@ -1408,6 +1447,16 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id); TSDB_CHECK_CODE(code, lino, _exit); } +#else + code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &row, NULL, id.uid); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) { + code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk, + pCommitter->cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); + } +#endif } _exit: @@ -1441,7 +1490,11 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { pTSchema = pCommitter->skmRow.pTSchema; } +#if USE_STREAM_COMPRESSION code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &pRowInfo->row, pTSchema, &id); +#else + code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pRowInfo->row, pTSchema, id.uid); +#endif TSDB_CHECK_CODE(code, lino, _exit); code = tsdbNextCommitRow(pCommitter); @@ -1452,6 +1505,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { pRowInfo = NULL; } +#if USE_STREAM_COMPRESSION if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) { code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk); TSDB_CHECK_CODE(code, lino, _exit); @@ -1459,6 +1513,13 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id); TSDB_CHECK_CODE(code, lino, _exit); } +#else + if (pCommitter->dWriter.bData.nRow >= pCommitter->maxRow) { + code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk, + pCommitter->cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); + } +#endif } } else { SBlockData *pBData = &pCommitter->dWriter.bData; @@ -1561,7 +1622,12 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { code = tsdbMoveCommitData(pCommitter, id); TSDB_CHECK_CODE(code, lino, _exit); +#if USE_STREAM_COMPRESSION code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk); +#else + code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk, + pCommitter->cmprAlg); +#endif TSDB_CHECK_CODE(code, lino, _exit); _exit: