diff --git a/include/util/tarray2.h b/include/util/tarray2.h index 7a4ea9dce9bac7fcdbe7b787e23666831f33c601..db0fc759768cdaee63541df04e8c009905f1e07a 100644 --- a/include/util/tarray2.h +++ b/include/util/tarray2.h @@ -43,6 +43,8 @@ typedef void (*TArray2Cb)(void *); #define TARRAY2_INITIALIZER \ { 0, 0, NULL } #define TARRAY2_SIZE(a) ((a)->size) +#define TARRAY2_CAPACITY(a) ((a)->capacity) +#define TARRAY2_DATA(a) ((a)->data) #define TARRAY2_GET(a, i) ((a)->data[i]) #define TARRAY2_GET_PTR(a, i) (&((a)->data[i])) #define TARRAY2_FIRST(a) ((a)->data[0]) diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h index 37046f311211763531399d2d9eb676b2eccaf7f0..ace5ebf17ac49c802c46532336868620618ce894 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h @@ -57,8 +57,8 @@ SSttLvl *tsdbTFileSetGetLvl(STFileSet *fset, int32_t level); bool tsdbTFileSetIsEmpty(const STFileSet *fset); struct STFileOp { - int32_t fid; tsdb_fop_t optype; + int32_t fid; STFile of; // old file state STFile nf; // new file state }; diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h index 7f6cea0abbdc7957ca4a7c8c5a3c9d2379eb61ea..65e95aa88f9f0100f4b66a50c8e57e9a9927694a 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h @@ -22,6 +22,8 @@ extern "C" { #endif +typedef TARRAY2(SSttBlk) TSttBlkArray; + // SSttFileReader ========================================== typedef struct SSttFSegReader SSttFSegReader; typedef struct SSttFileReader SSttFileReader; @@ -54,19 +56,20 @@ struct SSttFileReaderConfig { typedef struct SSttFileWriter SSttFileWriter; typedef struct SSttFileWriterConfig SSttFileWriterConfig; -int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **ppWriter); -int32_t tsdbSttFWriterClose(SSttFileWriter **ppWriter, int8_t abort, struct STFileOp *op); -int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, SRowInfo *pRowInfo); -int32_t tsdbSttFWriteTSDataBlock(SSttFileWriter *pWriter, SBlockData *pBlockData); -int32_t tsdbSttFWriteDLData(SSttFileWriter *pWriter, TABLEID *tbid, SDelData *pDelData); +int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **writer); +int32_t tsdbSttFWriterClose(SSttFileWriter **writer, int8_t abort, STFileOp *op); +int32_t tsdbSttFWriteTSData(SSttFileWriter *writer, SRowInfo *pRowInfo); +int32_t tsdbSttFWriteTSDataBlock(SSttFileWriter *writer, SBlockData *pBlockData); +int32_t tsdbSttFWriteDLData(SSttFileWriter *writer, TABLEID *tbid, SDelData *pDelData); struct SSttFileWriterConfig { - STsdb *pTsdb; + STsdb *tsdb; int32_t maxRow; int32_t szPage; int8_t cmprAlg; - SSkmInfo *pSkmTb; - SSkmInfo *pSkmRow; + int64_t compVer; // compact version + SSkmInfo *skmTb; + SSkmInfo *skmRow; uint8_t **aBuf; STFile file; }; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index 430281dac7b2189f6f73f148566f3460ff964f68..08b77c85876564224e5b97c706591a7963ce2d83 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -58,12 +58,12 @@ static int32_t open_writer_with_new_stt(SCommitter *pCommitter) { TSDB_CHECK_CODE(code, lino, _exit); } - config.pTsdb = pTsdb; + config.tsdb = pTsdb; config.maxRow = pCommitter->maxRow; config.szPage = pVnode->config.tsdbPageSize; config.cmprAlg = pCommitter->cmprAlg; - config.pSkmTb = NULL; - config.pSkmRow = NULL; + config.skmTb = NULL; + config.skmRow = NULL; config.aBuf = NULL; config.file.type = TSDB_FTYPE_STT; config.file.did = did; @@ -93,12 +93,12 @@ static int32_t open_writer_with_exist_stt(SCommitter *pCommitter, const STFile * SSttFileWriterConfig config = { // - .pTsdb = pTsdb, + .tsdb = pTsdb, .maxRow = pCommitter->maxRow, .szPage = pVnode->config.tsdbPageSize, .cmprAlg = pCommitter->cmprAlg, - .pSkmTb = NULL, - .pSkmRow = NULL, + .skmTb = NULL, + .skmRow = NULL, .aBuf = NULL, .file = *pFile // }; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index 823d29edb4312d26d4132f704d0047d7ee6c33af..69a726f7d1e57abdce42bf81786c23cbc3130dd5 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -222,12 +222,12 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { // open stt file writer if (lvl) { SSttFileWriterConfig config = { - .pTsdb = merger->tsdb, + .tsdb = merger->tsdb, .maxRow = merger->maxRow, .szPage = merger->szPage, .cmprAlg = merger->cmprAlg, - .pSkmTb = &merger->skmTb, - .pSkmRow = &merger->skmRow, + .skmTb = &merger->skmTb, + .skmRow = &merger->skmRow, .aBuf = merger->aBuf, .file = fobj->f, }; @@ -235,12 +235,12 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { TSDB_CHECK_CODE(code, lino, _exit); } else { SSttFileWriterConfig config = { - .pTsdb = merger->tsdb, + .tsdb = merger->tsdb, .maxRow = merger->maxRow, .szPage = merger->szPage, .cmprAlg = merger->cmprAlg, - .pSkmTb = &merger->skmTb, - .pSkmRow = &merger->skmRow, + .skmTb = &merger->skmTb, + .skmRow = &merger->skmRow, .aBuf = merger->aBuf, .file = (STFile){ diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c index 4560e76c67c4db20e6d74e3da1ed27c0682b5e04..0cf9de2e1fa55151915e7ed7ba1ad9f18ea31cd4 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFReaderWriter.c @@ -98,190 +98,157 @@ int32_t tsdbSttFSegReadSttBlock(SSttFSegReader *pSegReader, const void *pBlock) // SSttFWriter ============================================================ struct SSttFileWriter { SSttFileWriterConfig config; + struct { + bool opened; + } ctx; // file - STFile tFile; + STFile file; // data - SFSttFooter footer; - SBlockData bData; - SDelBlock dData; - STbStatisBlock sData; - SArray *aSttBlk; // SArray - SArray *aDelBlk; // SArray - SArray *aStatisBlk; // SArray + TARRAY2(SSttBlk) sttBlkArray; + TARRAY2(SDelBlk) delBlkArray; + TARRAY2(STbStatisBlk) statisBlkArray; void *bloomFilter; // TODO + SFSttFooter footer; + SBlockData bData[1]; + SDelBlock dData[1]; + STbStatisBlock sData[1]; // helper data SSkmInfo skmTb; SSkmInfo skmRow; int32_t aBufSize[5]; uint8_t *aBuf[5]; - STsdbFD *pFd; + STsdbFD *fd; }; -static int32_t write_timeseries_block(SSttFileWriter *pWriter) { +static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) { + if (writer->bData->nRow == 0) return 0; + int32_t code = 0; - int32_t lino; + int32_t lino = 0; + SSttBlk sttBlk[1]; + + sttBlk->suid = writer->bData->suid; + sttBlk->minUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[0]; + sttBlk->maxUid = writer->bData->uid ? writer->bData->uid : writer->bData->aUid[writer->bData->nRow - 1]; + sttBlk->minKey = sttBlk->maxKey = writer->bData->aTSKEY[0]; + sttBlk->minVer = sttBlk->maxVer = writer->bData->aVersion[0]; + sttBlk->nRow = writer->bData->nRow; + for (int32_t iRow = 1; iRow < writer->bData->nRow; iRow++) { + if (sttBlk->minKey > writer->bData->aTSKEY[iRow]) sttBlk->minKey = writer->bData->aTSKEY[iRow]; + if (sttBlk->maxKey < writer->bData->aTSKEY[iRow]) sttBlk->maxKey = writer->bData->aTSKEY[iRow]; + if (sttBlk->minVer > writer->bData->aVersion[iRow]) sttBlk->minVer = writer->bData->aVersion[iRow]; + if (sttBlk->maxVer < writer->bData->aVersion[iRow]) sttBlk->maxVer = writer->bData->aVersion[iRow]; + } + + code = tCmprBlockData(writer->bData, writer->config.cmprAlg, NULL, NULL, writer->config.aBuf, writer->aBufSize); + TSDB_CHECK_CODE(code, lino, _exit); - SBlockData *pBData = &pWriter->bData; - SSttBlk *pSttBlk = (SSttBlk *)taosArrayReserve(pWriter->aSttBlk, 1); - if (pSttBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } + sttBlk->bInfo.offset = writer->file.size; + sttBlk->bInfo.szKey = writer->aBufSize[2] + writer->aBufSize[3]; + sttBlk->bInfo.szBlock = writer->aBufSize[0] + writer->aBufSize[1] + sttBlk->bInfo.szKey; - pSttBlk->suid = pBData->suid; - pSttBlk->minUid = pBData->uid ? pBData->uid : pBData->aUid[0]; - pSttBlk->maxUid = pBData->uid ? pBData->uid : pBData->aUid[pBData->nRow - 1]; - pSttBlk->minKey = pSttBlk->maxKey = pBData->aTSKEY[0]; - pSttBlk->minVer = pSttBlk->maxVer = pBData->aVersion[0]; - pSttBlk->nRow = pBData->nRow; - for (int32_t iRow = 1; iRow < pBData->nRow; iRow++) { - if (pSttBlk->minKey > pBData->aTSKEY[iRow]) pSttBlk->minKey = pBData->aTSKEY[iRow]; - if (pSttBlk->maxKey < pBData->aTSKEY[iRow]) pSttBlk->maxKey = pBData->aTSKEY[iRow]; - if (pSttBlk->minVer > pBData->aVersion[iRow]) pSttBlk->minVer = pBData->aVersion[iRow]; - if (pSttBlk->maxVer < pBData->aVersion[iRow]) pSttBlk->maxVer = pBData->aVersion[iRow]; - } - - TSDB_CHECK_CODE( // - code = tCmprBlockData( // - pBData, // - pWriter->config.cmprAlg, // - NULL, // - NULL, // - pWriter->config.aBuf, // - pWriter->aBufSize), // - lino, // - _exit); - - pSttBlk->bInfo.offset = pWriter->tFile.size; - pSttBlk->bInfo.szKey = pWriter->aBufSize[2] + pWriter->aBufSize[3]; - pSttBlk->bInfo.szBlock = pWriter->aBufSize[0] + pWriter->aBufSize[1] + pSttBlk->bInfo.szKey; - - for (int32_t iBuf = 3; iBuf >= 0; iBuf--) { - if (pWriter->aBufSize[iBuf]) { - TSDB_CHECK_CODE( // - code = tsdbWriteFile( // - pWriter->pFd, // - pWriter->tFile.size, // - pWriter->config.aBuf[iBuf], // - pWriter->aBufSize[iBuf]), // - lino, // - _exit); - - pWriter->tFile.size += pWriter->aBufSize[iBuf]; + for (int32_t i = 3; i >= 0; i--) { + if (writer->aBufSize[i]) { + code = tsdbWriteFile(writer->fd, writer->file.size, writer->config.aBuf[i], writer->aBufSize[i]); + TSDB_CHECK_CODE(code, lino, _exit); + writer->file.size += writer->aBufSize[i]; } } + tBlockDataClear(writer->bData); - tBlockDataClear(pBData); + code = TARRAY2_APPEND_P(&writer->sttBlkArray, sttBlk); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError( // - "vgId:%d %s failed at line %d since %s", // - TD_VID(pWriter->config.pTsdb->pVnode), // - __func__, // - lino, // - tstrerror(code)); - } else { - // tsdbTrace(); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config.tsdb->pVnode), __func__, lino, + tstrerror(code)); } return code; } -static int32_t write_statistics_block(SSttFileWriter *pWriter) { +static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { + if (writer->sData->nRow == 0) return 0; + int32_t code = 0; - int32_t lino; + int32_t lino = 0; - STbStatisBlk *pStatisBlk = (STbStatisBlk *)taosArrayReserve(pWriter->aStatisBlk, 1); - if (pStatisBlk == NULL) { - TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); - } + STbStatisBlk statisBlk[1]; - pStatisBlk->nRow = pWriter->sData.nRow; - pStatisBlk->minTid.suid = pWriter->sData.aData[0][0]; - pStatisBlk->minTid.uid = pWriter->sData.aData[1][0]; - pStatisBlk->maxTid.suid = pWriter->sData.aData[0][pWriter->sData.nRow - 1]; - pStatisBlk->maxTid.uid = pWriter->sData.aData[1][pWriter->sData.nRow - 1]; - pStatisBlk->minVer = pStatisBlk->maxVer = pStatisBlk->maxVer = pWriter->sData.aData[2][0]; - for (int32_t iRow = 1; iRow < pWriter->sData.nRow; iRow++) { - if (pStatisBlk->minVer > pWriter->sData.aData[2][iRow]) pStatisBlk->minVer = pWriter->sData.aData[2][iRow]; - if (pStatisBlk->maxVer < pWriter->sData.aData[2][iRow]) pStatisBlk->maxVer = pWriter->sData.aData[2][iRow]; + statisBlk->nRow = writer->sData->nRow; + statisBlk->minTid.suid = writer->sData->aData[0][0]; + statisBlk->minTid.uid = writer->sData->aData[1][0]; + statisBlk->maxTid.suid = writer->sData->aData[0][writer->sData->nRow - 1]; + statisBlk->maxTid.uid = writer->sData->aData[1][writer->sData->nRow - 1]; + statisBlk->minVer = statisBlk->maxVer = statisBlk->maxVer = writer->sData->aData[2][0]; + for (int32_t iRow = 1; iRow < writer->sData->nRow; iRow++) { + if (statisBlk->minVer > writer->sData->aData[2][iRow]) statisBlk->minVer = writer->sData->aData[2][iRow]; + if (statisBlk->maxVer < writer->sData->aData[2][iRow]) statisBlk->maxVer = writer->sData->aData[2][iRow]; } - pStatisBlk->dp.offset = pWriter->tFile.size; - pStatisBlk->dp.size = 0; + statisBlk->dp.offset = writer->file.size; + statisBlk->dp.size = 0; // TODO: add compression here - int64_t tsize = sizeof(int64_t) * pWriter->sData.nRow; - for (int32_t i = 0; i < ARRAY_SIZE(pWriter->sData.aData); i++) { - TSDB_CHECK_CODE( // - code = tsdbWriteFile( // - pWriter->pFd, // - pWriter->tFile.size, // - (const uint8_t *)pWriter->sData.aData[i], // - tsize), // - lino, // - _exit); - - pStatisBlk->dp.size += tsize; - pWriter->tFile.size += tsize; + int64_t tsize = sizeof(int64_t) * writer->sData->nRow; + for (int32_t i = 0; i < ARRAY_SIZE(writer->sData->aData); i++) { + code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)writer->sData->aData[i], tsize); + TSDB_CHECK_CODE(code, lino, _exit); + + statisBlk->dp.size += tsize; + writer->file.size += tsize; } + tTbStatisBlockClear(writer->sData); - tTbStatisBlockClear(&pWriter->sData); + code = TARRAY2_APPEND_P(&writer->statisBlkArray, statisBlk); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError( // - "vgId:%d %s failed at line %d since %s", // - TD_VID(pWriter->config.pTsdb->pVnode), // - __func__, // - lino, // - tstrerror(code)); - } else { - // tsdbTrace(); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config.tsdb->pVnode), __func__, lino, + tstrerror(code)); } return code; } -static int32_t write_delete_block(SSttFileWriter *pWriter) { +static int32_t tsdbSttFileDoWriteDelBlock(SSttFileWriter *writer) { + if (writer->dData->nRow == 0) return 0; + int32_t code = 0; int32_t lino; - ASSERTS(0, "TODO: Not implemented yet"); - - SDelBlk *pDelBlk = taosArrayReserve(pWriter->aDelBlk, 1); - if (pDelBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } + SDelBlk delBlk[1]; - pDelBlk->nRow = pWriter->sData.nRow; - pDelBlk->minTid.suid = pWriter->sData.aData[0][0]; - pDelBlk->minTid.uid = pWriter->sData.aData[1][0]; - pDelBlk->maxTid.suid = pWriter->sData.aData[0][pWriter->sData.nRow - 1]; - pDelBlk->maxTid.uid = pWriter->sData.aData[1][pWriter->sData.nRow - 1]; - pDelBlk->minVer = pDelBlk->maxVer = pDelBlk->maxVer = pWriter->sData.aData[2][0]; - for (int32_t iRow = 1; iRow < pWriter->sData.nRow; iRow++) { - if (pDelBlk->minVer > pWriter->sData.aData[2][iRow]) pDelBlk->minVer = pWriter->sData.aData[2][iRow]; - if (pDelBlk->maxVer < pWriter->sData.aData[2][iRow]) pDelBlk->maxVer = pWriter->sData.aData[2][iRow]; + delBlk->nRow = writer->sData->nRow; + delBlk->minTid.suid = writer->sData->aData[0][0]; + delBlk->minTid.uid = writer->sData->aData[1][0]; + delBlk->maxTid.suid = writer->sData->aData[0][writer->sData->nRow - 1]; + delBlk->maxTid.uid = writer->sData->aData[1][writer->sData->nRow - 1]; + delBlk->minVer = delBlk->maxVer = delBlk->maxVer = writer->sData->aData[2][0]; + for (int32_t iRow = 1; iRow < writer->sData->nRow; iRow++) { + if (delBlk->minVer > writer->sData->aData[2][iRow]) delBlk->minVer = writer->sData->aData[2][iRow]; + if (delBlk->maxVer < writer->sData->aData[2][iRow]) delBlk->maxVer = writer->sData->aData[2][iRow]; } - pDelBlk->dp.offset = pWriter->tFile.size; - pDelBlk->dp.size = 0; // TODO + delBlk->dp.offset = writer->file.size; + delBlk->dp.size = 0; // TODO - int64_t tsize = sizeof(int64_t) * pWriter->dData.nRow; - for (int32_t i = 0; i < ARRAY_SIZE(pWriter->dData.aData); i++) { - code = tsdbWriteFile(pWriter->pFd, pWriter->tFile.size, (const uint8_t *)pWriter->dData.aData[i], tsize); + int64_t tsize = sizeof(int64_t) * writer->dData->nRow; + for (int32_t i = 0; i < ARRAY_SIZE(writer->dData->aData); i++) { + code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)writer->dData->aData[i], tsize); TSDB_CHECK_CODE(code, lino, _exit); - pDelBlk->dp.size += tsize; - pWriter->tFile.size += tsize; + delBlk->dp.size += tsize; + writer->file.size += tsize; } + tDelBlockDestroy(writer->dData); - tDelBlockDestroy(&pWriter->dData); + code = TARRAY2_APPEND_P(&writer->delBlkArray, delBlk); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config.tsdb->pVnode), __func__, lino, tstrerror(code)); } else { // tsdbTrace(); @@ -289,423 +256,340 @@ _exit: return code; } -static int32_t write_stt_blk(SSttFileWriter *pWriter) { +static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) { int32_t code = 0; int32_t lino; - pWriter->footer.dict[1].offset = pWriter->tFile.size; - pWriter->footer.dict[1].size = sizeof(SSttBlk) * taosArrayGetSize(pWriter->aSttBlk); + writer->footer.dict[1].offset = writer->file.size; + writer->footer.dict[1].size = sizeof(SSttBlk) * TARRAY2_SIZE(&writer->sttBlkArray); - if (pWriter->footer.dict[1].size) { - TSDB_CHECK_CODE( // - code = tsdbWriteFile( // - pWriter->pFd, // - pWriter->tFile.size, // - TARRAY_DATA(pWriter->aSttBlk), // - pWriter->footer.dict[1].size), // - lino, // - _exit); + if (writer->footer.dict[1].size) { + code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)TARRAY2_DATA(&writer->sttBlkArray), + writer->footer.dict[1].size); + TSDB_CHECK_CODE(code, lino, _exit); - pWriter->tFile.size += pWriter->footer.dict[1].size; + writer->file.size += writer->footer.dict[1].size; } _exit: if (code) { - tsdbError( // - "vgId:%d %s failed at line %d since %s", // - TD_VID(pWriter->config.pTsdb->pVnode), // - __func__, // - lino, // - tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config.tsdb->pVnode), __func__, lino, + tstrerror(code)); } return code; } -static int32_t write_statistics_blk(SSttFileWriter *pWriter) { +static int32_t tsdbSttFileDoWriteStatisBlk(SSttFileWriter *writer) { int32_t code = 0; int32_t lino; - pWriter->footer.dict[2].offset = pWriter->tFile.size; - pWriter->footer.dict[2].size = sizeof(STbStatisBlock) * taosArrayGetSize(pWriter->aStatisBlk); - - if (pWriter->footer.dict[2].size) { - TSDB_CHECK_CODE( // - code = tsdbWriteFile( // - pWriter->pFd, // - pWriter->tFile.size, // - TARRAY_DATA(pWriter->aStatisBlk), // - pWriter->footer.dict[2].size), // - lino, // - _exit); + writer->footer.dict[2].offset = writer->file.size; + writer->footer.dict[2].size = sizeof(STbStatisBlock) * TARRAY2_SIZE(&writer->statisBlkArray); - pWriter->tFile.size += pWriter->footer.dict[2].size; + if (writer->footer.dict[2].size) { + code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)TARRAY2_DATA(&writer->statisBlkArray), + writer->footer.dict[2].size); + TSDB_CHECK_CODE(code, lino, _exit); + writer->file.size += writer->footer.dict[2].size; } _exit: if (code) { - tsdbError( // - "vgId:%d %s failed at line %d since %s", // - TD_VID(pWriter->config.pTsdb->pVnode), // - __func__, // - lino, // - tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config.tsdb->pVnode), __func__, lino, + tstrerror(code)); } return code; } -static int32_t write_del_blk(SSttFileWriter *pWriter) { +static int32_t tsdbSttFileDoWriteDelBlk(SSttFileWriter *writer) { int32_t code = 0; int32_t lino; - pWriter->footer.dict[3].offset = pWriter->tFile.size; - pWriter->footer.dict[3].size = sizeof(SDelBlk) * taosArrayGetSize(pWriter->aDelBlk); - - if (pWriter->footer.dict[3].size) { - TSDB_CHECK_CODE( // - code = tsdbWriteFile( // - pWriter->pFd, // - pWriter->tFile.size, // - TARRAY_DATA(pWriter->aDelBlk), // - pWriter->footer.dict[3].size), // - lino, // - _exit); + writer->footer.dict[3].offset = writer->file.size; + writer->footer.dict[3].size = sizeof(SDelBlk) * TARRAY2_SIZE(&writer->delBlkArray); - pWriter->tFile.size += pWriter->footer.dict[3].size; + if (writer->footer.dict[3].size) { + code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)TARRAY2_DATA(&writer->delBlkArray), + writer->footer.dict[3].size); + TSDB_CHECK_CODE(code, lino, _exit); + writer->file.size += writer->footer.dict[3].size; } _exit: if (code) { - tsdbError( // - "vgId:%d %s failed at line %d since %s", // - TD_VID(pWriter->config.pTsdb->pVnode), // - __func__, // - lino, // - tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config.tsdb->pVnode), __func__, lino, + tstrerror(code)); } return code; } -static int32_t write_file_footer(SSttFileWriter *pWriter) { - int32_t code = tsdbWriteFile( // - pWriter->pFd, // - pWriter->tFile.size, // - (const uint8_t *)&pWriter->footer, // - sizeof(pWriter->footer)); - pWriter->tFile.size += sizeof(pWriter->footer); +static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) { + int32_t code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)&writer->footer, sizeof(writer->footer)); + writer->file.size += sizeof(writer->footer); return code; } -static int32_t write_file_header(SSttFileWriter *pWriter) { - // TODO - return 0; -} - -static int32_t create_stt_fwriter(const SSttFileWriterConfig *pConf, SSttFileWriter **ppWriter) { +static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { int32_t code = 0; + int32_t lino = 0; + int32_t vid = TD_VID(writer->config.tsdb->pVnode); - // alloc - if (((ppWriter[0] = taosMemoryCalloc(1, sizeof(*ppWriter[0]))) == NULL) // - || ((ppWriter[0]->aSttBlk = taosArrayInit(64, sizeof(SSttBlk))) == NULL) // - || ((ppWriter[0]->aDelBlk = taosArrayInit(64, sizeof(SDelBlk))) == NULL) // - || ((ppWriter[0]->aStatisBlk = taosArrayInit(64, sizeof(STbStatisBlock))) == NULL) // - ) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } + // set + writer->file = writer->config.file; + writer->file.stt.nseg++; + if (!writer->config.skmTb) writer->config.skmTb = &writer->skmTb; + if (!writer->config.skmRow) writer->config.skmRow = &writer->skmRow; + if (!writer->config.aBuf) writer->config.aBuf = writer->aBuf; - if ((code = tBlockDataCreate(&ppWriter[0]->bData)) // - || (code = tDelBlockCreate(&ppWriter[0]->dData, pConf->maxRow)) // - || (code = tTbStatisBlockCreate(&ppWriter[0]->sData, pConf->maxRow)) // - ) { - goto _exit; - } + // open file + int32_t flag; + char fname[TSDB_FILENAME_LEN]; - // init - ppWriter[0]->config = pConf[0]; - ppWriter[0]->tFile = pConf->file; - ppWriter[0]->footer.prevFooter = ppWriter[0]->tFile.size; - if (pConf->pSkmTb == NULL) { - ppWriter[0]->config.pSkmTb = &ppWriter[0]->skmTb; - } - if (pConf->pSkmRow == NULL) { - ppWriter[0]->config.pSkmRow = &ppWriter[0]->skmRow; + if (writer->file.size) { + flag = TD_FILE_READ | TD_FILE_WRITE; + } else { + flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; } - if (pConf->aBuf == NULL) { - ppWriter[0]->config.aBuf = ppWriter[0]->aBuf; + + tsdbTFileName(writer->config.tsdb, &writer->file, fname); + code = tsdbOpenFile(fname, writer->config.szPage, flag, &writer->fd); + TSDB_CHECK_CODE(code, lino, _exit); + + if (!writer->file.size) { + uint8_t hdr[TSDB_FHDR_SIZE] = {0}; + + code = tsdbWriteFile(writer->fd, 0, hdr, sizeof(hdr)); + TSDB_CHECK_CODE(code, lino, _exit); + writer->file.size += sizeof(hdr); } _exit: - if (code && ppWriter[0]) { - tTbStatisBlockDestroy(&ppWriter[0]->sData); - tDelBlockDestroy(&ppWriter[0]->dData); - tBlockDataDestroy(&ppWriter[0]->bData); - taosArrayDestroy(ppWriter[0]->aStatisBlk); - taosArrayDestroy(ppWriter[0]->aDelBlk); - taosArrayDestroy(ppWriter[0]->aSttBlk); - taosMemoryFree(ppWriter[0]); - ppWriter[0] = NULL; + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + } else { + writer->ctx.opened = true; } - return code; + return 0; } -static int32_t destroy_stt_fwriter(SSttFileWriter *pWriter) { - if (pWriter) { - for (int32_t i = 0; i < ARRAY_SIZE(pWriter->aBuf); i++) { - tFree(pWriter->aBuf[i]); - } - tDestroyTSchema(pWriter->skmRow.pTSchema); - tDestroyTSchema(pWriter->skmTb.pTSchema); +static void tsdbSttFWriterDoClose(SSttFileWriter *pWriter) { + // TODO: do clear the struct +} - tTbStatisBlockDestroy(&pWriter->sData); - tDelBlockDestroy(&pWriter->dData); - tBlockDataDestroy(&pWriter->bData); +int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **writer) { + writer[0] = taosMemoryMalloc(sizeof(*writer[0])); + if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; - taosArrayDestroy(pWriter->aStatisBlk); - taosArrayDestroy(pWriter->aDelBlk); - taosArrayDestroy(pWriter->aSttBlk); - taosMemoryFree(pWriter); - } + writer[0]->config = config[0]; + writer[0]->ctx.opened = false; return 0; } -static int32_t open_stt_fwriter(SSttFileWriter *pWriter) { - int32_t code = 0; - int32_t lino = 0; - int32_t vid = TD_VID(pWriter->config.pTsdb->pVnode); - char fname[TSDB_FILENAME_LEN]; - uint8_t hdr[TSDB_FHDR_SIZE] = {0}; +static int32_t tsdbSttFileDoWriteBloomFilter(SSttFileWriter *writer) { + // TODO + return 0; +} +static int32_t tsdbSttFileDoUpdateHeader(SSttFileWriter *writer) { + // TODO + return 0; +} +static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, STFileOp *op) { + int32_t lino; + int32_t code; + int32_t vid = TD_VID(writer->config.tsdb->pVnode); - int32_t flag = TD_FILE_READ | TD_FILE_WRITE; - if (pWriter->tFile.size == 0) { - flag |= (TD_FILE_CREATE | TD_FILE_TRUNC); - } + code = tsdbSttFileDoWriteTSDataBlock(writer); + TSDB_CHECK_CODE(code, lino, _exit); - tsdbTFileName(pWriter->config.pTsdb, &pWriter->config.file, fname); - code = tsdbOpenFile(fname, pWriter->config.szPage, flag, &pWriter->pFd); + code = tsdbSttFileDoWriteStatisBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); - if (pWriter->tFile.size == 0) { - code = tsdbWriteFile(pWriter->pFd, 0, hdr, sizeof(hdr)); - TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbSttFileDoWriteDelBlock(writer); + TSDB_CHECK_CODE(code, lino, _exit); - pWriter->tFile.size += sizeof(hdr); - } + code = tsdbSttFileDoWriteSttBlk(writer); + TSDB_CHECK_CODE(code, lino, _exit); -_exit: - if (code) { - if (pWriter->pFd) tsdbCloseFile(&pWriter->pFd); - tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); - } else { - tsdbDebug("vgId:%d %s done, fname:%s size:%" PRId64, vid, __func__, "" /*pWriter->config.file.fname*/, - pWriter->config.file.size); - } - return code; -} + code = tsdbSttFileDoWriteStatisBlk(writer); + TSDB_CHECK_CODE(code, lino, _exit); -static int32_t close_stt_fwriter(SSttFileWriter *pWriter) { - tsdbCloseFile(&pWriter->pFd); - return 0; -} + code = tsdbSttFileDoWriteDelBlk(writer); + TSDB_CHECK_CODE(code, lino, _exit); -int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *pConf, SSttFileWriter **ppWriter) { - int32_t code = 0; - int32_t lino = 0; - int32_t vid = TD_VID(pConf->pTsdb->pVnode); + code = tsdbSttFileDoWriteBloomFilter(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbSttFileDoWriteFooter(writer); + TSDB_CHECK_CODE(code, lino, _exit); - code = create_stt_fwriter(pConf, ppWriter); + code = tsdbSttFileDoUpdateHeader(writer); TSDB_CHECK_CODE(code, lino, _exit); - code = open_stt_fwriter(ppWriter[0]); + code = tsdbFsyncFile(writer->fd); TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbCloseFile(&writer->fd); + TSDB_CHECK_CODE(code, lino, _exit); + + ASSERT(writer->config.file.size > writer->file.size); + op->optype = writer->config.file.size ? TSDB_FOP_MODIFY : TSDB_FOP_CREATE; + op->fid = writer->config.file.fid; + op->of = writer->config.file; + op->nf = writer->file; + _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); - if (ppWriter[0]) { - destroy_stt_fwriter(ppWriter[0]); - ppWriter[0] = NULL; - } } return code; } - -int32_t tsdbSttFWriterClose(SSttFileWriter **ppWriter, int8_t abort, struct STFileOp *op) { - int32_t vgId = TD_VID(ppWriter[0]->config.pTsdb->pVnode); +static int32_t tsdbSttFWriterCloseAbort(SSttFileWriter *writer) { + // TODO + ASSERT(0); + return 0; +} +int32_t tsdbSttFWriterClose(SSttFileWriter **writer, int8_t abort, STFileOp *op) { int32_t code = 0; int32_t lino = 0; + int32_t vid = TD_VID(writer[0]->config.tsdb->pVnode); - if (!abort) { - ppWriter[0]->tFile.stt.nseg++; - if (ppWriter[0]->bData.nRow > 0) { - code = write_timeseries_block(ppWriter[0]); + if (!writer[0]->ctx.opened) { + op->optype = TSDB_FOP_NONE; + } else { + if (!abort) { + code = tsdbSttFWriterCloseCommit(writer[0], op); TSDB_CHECK_CODE(code, lino, _exit); - } - - if (ppWriter[0]->sData.nRow > 0) { - code = write_statistics_block(ppWriter[0]); + } else { + code = tsdbSttFWriterCloseAbort(writer[0]); TSDB_CHECK_CODE(code, lino, _exit); } - - if (ppWriter[0]->dData.nRow > 0) { - code = write_delete_block(ppWriter[0]); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = write_stt_blk(ppWriter[0]); - TSDB_CHECK_CODE(code, lino, _exit); - - code = write_statistics_blk(ppWriter[0]); - TSDB_CHECK_CODE(code, lino, _exit); - - code = write_del_blk(ppWriter[0]); - TSDB_CHECK_CODE(code, lino, _exit); - - code = write_file_footer(ppWriter[0]); - TSDB_CHECK_CODE(code, lino, _exit); - - code = write_file_header(ppWriter[0]); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbFsyncFile(ppWriter[0]->pFd); - TSDB_CHECK_CODE(code, lino, _exit); - - if (op) { - STFile *f = &ppWriter[0]->config.file; - op->fid = f->fid; - if (f->size == 0) { - op->optype = TSDB_FOP_CREATE; - } else { - op->optype = TSDB_FOP_MODIFY; - } - op->of = f[0]; - op->nf = ppWriter[0]->tFile; - } + tsdbSttFWriterDoClose(writer[0]); } - - code = close_stt_fwriter(ppWriter[0]); - TSDB_CHECK_CODE(code, lino, _exit); - - destroy_stt_fwriter(ppWriter[0]); - ppWriter[0] = NULL; + taosMemoryFree(writer[0]); + writer[0] = NULL; _exit: if (code) { - tsdbError( // - "vgId:%d %s failed at line %d since %s", // - vgId, // - __func__, // - lino, // - tstrerror(code)); - } else { + tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); } return code; } -int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, SRowInfo *pRowInfo) { +int32_t tsdbSttFWriteTSData(SSttFileWriter *writer, SRowInfo *row) { int32_t code = 0; - int32_t lino; + int32_t lino = 0; - TABLEID *tbid = (TABLEID *)pRowInfo; - TSDBROW *pRow = &pRowInfo->row; - TSDBKEY key = TSDBROW_KEY(pRow); + if (!writer->ctx.opened) { + code = tsdbSttFWriterDoOpen(writer); + TSDB_CHECK_CODE(code, lino, _exit); + } - if (!TABLE_SAME_SCHEMA(pWriter->bData.suid, pWriter->bData.uid, tbid->suid, tbid->uid)) { - if (pWriter->bData.nRow > 0) { - code = write_timeseries_block(pWriter); + TABLEID *tbid = (TABLEID *)row; + TSDBROW *pRow = &row->row; + TSDBKEY key = TSDBROW_KEY(pRow); + if (!TABLE_SAME_SCHEMA(writer->bData[0].suid, writer->bData[0].uid, tbid->suid, tbid->uid)) { + if (writer->bData[0].nRow > 0) { + code = tsdbSttFileDoWriteTSDataBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); } - if (pWriter->sData.nRow >= pWriter->config.maxRow) { - code = write_statistics_block(pWriter); + if (writer->sData[0].nRow >= writer->config.maxRow) { + code = tsdbSttFileDoWriteStatisBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); } - pWriter->sData.aData[0][pWriter->sData.nRow] = tbid->suid; // suid - pWriter->sData.aData[1][pWriter->sData.nRow] = tbid->uid; // uid - pWriter->sData.aData[2][pWriter->sData.nRow] = key.ts; // skey - pWriter->sData.aData[3][pWriter->sData.nRow] = key.version; // sver - pWriter->sData.aData[4][pWriter->sData.nRow] = key.ts; // ekey - pWriter->sData.aData[5][pWriter->sData.nRow] = key.version; // ever - pWriter->sData.aData[6][pWriter->sData.nRow] = 1; // count - pWriter->sData.nRow++; + writer->sData[0].aData[0][writer->sData[0].nRow] = tbid->suid; // suid + writer->sData[0].aData[1][writer->sData[0].nRow] = tbid->uid; // uid + writer->sData[0].aData[2][writer->sData[0].nRow] = key.ts; // skey + writer->sData[0].aData[3][writer->sData[0].nRow] = key.version; // sver + writer->sData[0].aData[4][writer->sData[0].nRow] = key.ts; // ekey + writer->sData[0].aData[5][writer->sData[0].nRow] = key.version; // ever + writer->sData[0].aData[6][writer->sData[0].nRow] = 1; // count + writer->sData[0].nRow++; - code = tsdbUpdateSkmTb(pWriter->config.pTsdb, tbid, pWriter->config.pSkmTb); + code = tsdbUpdateSkmTb(writer->config.tsdb, tbid, writer->config.skmTb); TSDB_CHECK_CODE(code, lino, _exit); TABLEID id = { .suid = tbid->suid, .uid = tbid->uid ? 0 : tbid->uid, }; - code = tBlockDataInit(&pWriter->bData, &id, pWriter->config.pSkmTb->pTSchema, NULL, 0); + code = tBlockDataInit(&writer->bData[0], &id, writer->config.skmTb->pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); } - if (pRowInfo->row.type == TSDBROW_ROW_FMT) { - code = tsdbUpdateSkmRow(pWriter->config.pTsdb, tbid, TSDBROW_SVERSION(pRow), pWriter->config.pSkmRow); + if (row->row.type == TSDBROW_ROW_FMT) { + code = tsdbUpdateSkmRow(writer->config.tsdb, tbid, TSDBROW_SVERSION(pRow), writer->config.skmRow); TSDB_CHECK_CODE(code, lino, _exit); } - code = tBlockDataAppendRow(&pWriter->bData, pRow, pWriter->config.pSkmRow->pTSchema, tbid->uid); + code = tBlockDataAppendRow(&writer->bData[0], pRow, writer->config.skmRow->pTSchema, tbid->uid); TSDB_CHECK_CODE(code, lino, _exit); - if (pWriter->bData.nRow >= pWriter->config.maxRow) { - code = write_timeseries_block(pWriter); + if (writer->bData[0].nRow >= writer->config.maxRow) { + code = tsdbSttFileDoWriteTSDataBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); } - if (key.ts > pWriter->sData.aData[4][pWriter->sData.nRow - 1]) { - pWriter->sData.aData[4][pWriter->sData.nRow - 1] = key.ts; // ekey - pWriter->sData.aData[5][pWriter->sData.nRow - 1] = key.version; // ever - pWriter->sData.aData[6][pWriter->sData.nRow - 1]++; // count - } else if (key.ts == pWriter->sData.aData[4][pWriter->sData.nRow - 1]) { - pWriter->sData.aData[4][pWriter->sData.nRow - 1] = key.ts; // ekey - pWriter->sData.aData[5][pWriter->sData.nRow - 1] = key.version; // ever + if (key.ts > writer->sData[0].aData[4][writer->sData[0].nRow - 1]) { + writer->sData[0].aData[4][writer->sData[0].nRow - 1] = key.ts; // ekey + writer->sData[0].aData[5][writer->sData[0].nRow - 1] = key.version; // ever + writer->sData[0].aData[6][writer->sData[0].nRow - 1]++; // count + } else if (key.ts == writer->sData[0].aData[4][writer->sData[0].nRow - 1]) { + writer->sData[0].aData[4][writer->sData[0].nRow - 1] = key.ts; // ekey + writer->sData[0].aData[5][writer->sData[0].nRow - 1] = key.version; // ever } else { ASSERTS(0, "timestamp should be in ascending order"); } _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config.tsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } -int32_t tsdbSttFWriteTSDataBlock(SSttFileWriter *pWriter, SBlockData *pBlockData) { +int32_t tsdbSttFWriteTSDataBlock(SSttFileWriter *writer, SBlockData *bdata) { int32_t code = 0; int32_t lino = 0; SRowInfo rowInfo; - rowInfo.suid = pBlockData->suid; - for (int32_t i = 0; i < pBlockData->nRow; i++) { - rowInfo.uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[i]; - rowInfo.row = tsdbRowFromBlockData(pBlockData, i); + rowInfo.suid = bdata->suid; + for (int32_t i = 0; i < bdata->nRow; i++) { + rowInfo.uid = bdata->uid ? bdata->uid : bdata->aUid[i]; + rowInfo.row = tsdbRowFromBlockData(bdata, i); - code = tsdbSttFWriteTSData(pWriter, &rowInfo); + code = tsdbSttFWriteTSData(writer, &rowInfo); TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino, + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config.tsdb->pVnode), __func__, lino, tstrerror(code)); } return 0; } -int32_t tsdbSttFWriteDLData(SSttFileWriter *pWriter, TABLEID *tbid, SDelData *pDelData) { +int32_t tsdbSttFWriteDLData(SSttFileWriter *writer, TABLEID *tbid, SDelData *pDelData) { ASSERTS(0, "TODO: Not implemented yet"); - pWriter->dData.aData[0][pWriter->dData.nRow] = tbid->suid; // suid - pWriter->dData.aData[1][pWriter->dData.nRow] = tbid->uid; // uid - pWriter->dData.aData[2][pWriter->dData.nRow] = pDelData->version; // version - pWriter->dData.aData[3][pWriter->dData.nRow] = pDelData->sKey; // skey - pWriter->dData.aData[4][pWriter->dData.nRow] = pDelData->eKey; // ekey - pWriter->dData.nRow++; + int32_t code; + if (!writer->ctx.opened) { + code = tsdbSttFWriterDoOpen(writer); + return code; + } + + writer->dData[0].aData[0][writer->dData[0].nRow] = tbid->suid; // suid + writer->dData[0].aData[1][writer->dData[0].nRow] = tbid->uid; // uid + writer->dData[0].aData[2][writer->dData[0].nRow] = pDelData->version; // version + writer->dData[0].aData[3][writer->dData[0].nRow] = pDelData->sKey; // skey + writer->dData[0].aData[4][writer->dData[0].nRow] = pDelData->eKey; // ekey + writer->dData[0].nRow++; - if (pWriter->dData.nRow >= pWriter->config.maxRow) { - return write_delete_block(pWriter); + if (writer->dData[0].nRow >= writer->config.maxRow) { + return tsdbSttFileDoWriteDelBlock(writer); } else { return 0; }