diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index dc218e86e0b1ec8c0e55162975b7a4365694d979..36aa5c0804dce46056ec750a887c819b917de608 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -686,6 +686,20 @@ struct SDiskData { SArray *aDiskCol; // SArray }; +struct SDiskDataBuilder { + int64_t suid; + int64_t uid; + int32_t nRow; + uint8_t cmprAlg; + uint8_t calcSma; + SCompressor *pUidC; + SCompressor *pVerC; + SCompressor *pKeyC; + int32_t nBuilder; + SArray *aBuilder; // SArray + uint8_t *aBuf[2]; +}; + int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo, const char *idStr); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 1e2a3d5b52043699ed6737e7214fc86cdc714f8c..0ff9ae5085a735a2e40a3780dde688ef9d1bb056 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -70,12 +70,12 @@ typedef struct { int8_t toLastOnly; }; struct { - SDataFWriter *pWriter; - SArray *aBlockIdx; // SArray - SArray *aSttBlk; // SArray - SMapData mBlock; // SMapData - SBlockData bData; - SBlockData bDatal; + SDataFWriter *pWriter; + SArray *aBlockIdx; // SArray + SArray *aSttBlk; // SArray + SMapData mBlock; // SMapData + SBlockData bData; + SDiskDataBuilder *pBuilder; } dWriter; SSkmInfo skmTable; SSkmInfo skmRow; @@ -139,7 +139,7 @@ int32_t tsdbBegin(STsdb *pTsdb) { _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -180,7 +180,7 @@ int32_t tsdbCommit(STsdb *pTsdb) { _exit: if (code) { tsdbEndCommit(&commith, code); - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -222,7 +222,7 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { _exit: if (code) { - tsdbError("vgId:%d, %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode)); } @@ -280,7 +280,7 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel _exit: if (code) { - tsdbError("vgId:%d, %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -314,7 +314,7 @@ static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { _exit: if (code) { - tsdbError("vgId:%d, %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -466,7 +466,7 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -542,7 +542,6 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { taosArrayClear(pCommitter->dWriter.aSttBlk); tMapDataReset(&pCommitter->dWriter.mBlock); tBlockDataReset(&pCommitter->dWriter.bData); - tBlockDataReset(&pCommitter->dWriter.bDatal); // open iter code = tsdbOpenCommitIter(pCommitter); @@ -550,7 +549,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -602,7 +601,7 @@ int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapDa _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -645,7 +644,7 @@ int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -681,7 +680,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -707,7 +706,7 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -734,7 +733,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbDataFReaderClose(&pCommitter->dReader.pReader); tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0); } @@ -772,7 +771,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -820,12 +819,12 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { code = tBlockDataCreate(&pCommitter->dWriter.bData); TSDB_CHECK_CODE(code, lino, _exit); - code = tBlockDataCreate(&pCommitter->dWriter.bDatal); + code = tDiskDataBuilderCreate(&pCommitter->dWriter.pBuilder); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -849,7 +848,7 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) { taosArrayDestroy(pCommitter->dWriter.aSttBlk); tMapDataClear(&pCommitter->dWriter.mBlock); tBlockDataDestroy(&pCommitter->dWriter.bData, 1); - tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1); + tDiskDataBuilderDestroy(pCommitter->dWriter.pBuilder); tTSchemaDestroy(pCommitter->skmTable.pTSchema); tTSchemaDestroy(pCommitter->skmRow.pTSchema); } @@ -880,7 +879,7 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) { _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -966,7 +965,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel); } @@ -1006,7 +1005,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode)); } @@ -1100,7 +1099,7 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -1148,7 +1147,7 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -1235,7 +1234,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -1310,7 +1309,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -1320,25 +1319,26 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) { int32_t code = 0; int32_t lino = 0; - SBlockData *pBDatal = &pCommitter->dWriter.bDatal; - if (pBDatal->suid || pBDatal->uid) { - if ((pBDatal->suid != id.suid) || (id.suid == 0)) { - code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); + SDiskDataBuilder *pBuilder = pCommitter->dWriter.pBuilder; + if (pBuilder->suid || pBuilder->uid) { + if (!TABLE_SAME_SCHEMA(pBuilder->suid, pBuilder->uid, id.suid, id.uid)) { + // code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, + // pCommitter->cmprAlg); // todo TSDB_CHECK_CODE(code, lino, _exit); - tBlockDataReset(pBDatal); + // tBlockDataReset(pBDatal); } } - if (!pBDatal->suid && !pBDatal->uid) { + if (!pBuilder->suid && !pBuilder->uid) { ASSERT(pCommitter->skmTable.suid == id.suid); ASSERT(pCommitter->skmTable.uid == id.uid); - code = tBlockDataInit(pBDatal, id.suid, id.suid ? 0 : id.uid, pCommitter->skmTable.pTSchema); + // code = tBlockDataInit(pBDatal, id.suid, id.suid ? 0 : id.uid, pCommitter->skmTable.pTSchema); todo TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -1349,7 +1349,6 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { int32_t lino = 0; SBlockData *pBData = &pCommitter->dWriter.bData; - SBlockData *pBDatal = &pCommitter->dWriter.bDatal; TABLEID id = {.suid = pBData->suid, .uid = pBData->uid}; code = tsdbInitLastBlockIfNeed(pCommitter, id); @@ -1357,18 +1356,20 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) { TSDBROW row = tsdbRowFromBlockData(pBData, iRow); - code = tBlockDataAppendRow(pBDatal, &row, NULL, pBData->uid); + + code = tDiskDataBuilderAddRow(pCommitter->dWriter.pBuilder, &row, NULL, &id); TSDB_CHECK_CODE(code, lino, _exit); - if (pBDatal->nRow >= pCommitter->maxRow) { - code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); + if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) { + code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, NULL /*TODO */, pCommitter->dWriter.aSttBlk, + pCommitter->cmprAlg); TSDB_CHECK_CODE(code, lino, _exit); } } _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -1385,60 +1386,79 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { if (pRowInfo == NULL) goto _exit; - SBlockData *pBData; if (pCommitter->toLastOnly) { - pBData = &pCommitter->dWriter.bDatal; - code = tsdbInitLastBlockIfNeed(pCommitter, id); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - pBData = &pCommitter->dWriter.bData; - ASSERT(pBData->nRow == 0); - } + // init the data if need - while (pRowInfo) { - STSchema *pTSchema = NULL; - if (pRowInfo->row.type == 0) { - code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); + while (pRowInfo) { + STSchema *pTSchema = NULL; + if (pRowInfo->row.type == 0) { + code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); + TSDB_CHECK_CODE(code, lino, _exit); + pTSchema = pCommitter->skmRow.pTSchema; + } + + code = tDiskDataBuilderAddRow(pCommitter->dWriter.pBuilder, &pRowInfo->row, pTSchema, &id); TSDB_CHECK_CODE(code, lino, _exit); - pTSchema = pCommitter->skmRow.pTSchema; - } - code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid); - TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbNextCommitRow(pCommitter); + TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbNextCommitRow(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); + pRowInfo = tsdbGetCommitRow(pCommitter); + if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { + pRowInfo = NULL; + } - pRowInfo = tsdbGetCommitRow(pCommitter); - if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { - pRowInfo = NULL; + if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) { + // code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, + // pCommitter->cmprAlg); (todo) + TSDB_CHECK_CODE(code, lino, _exit); + } } + } else { + SBlockData *pBData = &pCommitter->dWriter.bData; + ASSERT(pBData->nRow == 0); - if (pBData->nRow >= pCommitter->maxRow) { - if (pCommitter->toLastOnly) { - code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); + while (pRowInfo) { + STSchema *pTSchema = NULL; + if (pRowInfo->row.type == 0) { + code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); TSDB_CHECK_CODE(code, lino, _exit); - } else { + pTSchema = pCommitter->skmRow.pTSchema; + } + + code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbNextCommitRow(pCommitter); + TSDB_CHECK_CODE(code, lino, _exit); + + pRowInfo = tsdbGetCommitRow(pCommitter); + if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { + pRowInfo = NULL; + } + + if (pBData->nRow >= pCommitter->maxRow) { code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); TSDB_CHECK_CODE(code, lino, _exit); } } - } - if (!pCommitter->toLastOnly && pBData->nRow) { - if (pBData->nRow > pCommitter->minRow) { - code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbAppendLastBlock(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); + if (pBData->nRow) { + if (pBData->nRow > pCommitter->minRow) { + code = + tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbAppendLastBlock(pCommitter); + TSDB_CHECK_CODE(code, lino, _exit); + } } } _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; @@ -1495,13 +1515,13 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { code = tsdbMoveCommitData(pCommitter, id); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk, - pCommitter->cmprAlg); + // code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk, + // pCommitter->cmprAlg); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbDiskData.c b/source/dnode/vnode/src/tsdb/tsdbDiskData.c index 8d25873036dd872aa683404f23b4432a86fc5ef0..8c7933057dfdae0de115619de0b4ccd3efe5aa92 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDiskData.c +++ b/source/dnode/vnode/src/tsdb/tsdbDiskData.c @@ -34,20 +34,6 @@ struct SDiskColBuilder { uint8_t *aBuf[1]; }; -struct SDiskDataBuilder { - int64_t suid; - int64_t uid; - int32_t nRow; - uint8_t cmprAlg; - uint8_t calcSma; - SCompressor *pUidC; - SCompressor *pVerC; - SCompressor *pKeyC; - int32_t nBuilder; - SArray *aBuilder; // SArray - uint8_t *aBuf[2]; -}; - // SDiskColBuilder ================================================ #define tDiskColBuilderCreate() \ (SDiskColBuilder) { 0 }