diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index f45858ae8d6b526cc07dcd9ef974b8cd268b70d1..526944317399446da33d4fc778b6e14b2949e882 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -16,6 +16,7 @@ #include "tsdb.h" #include "tsdbDataFileRW.h" #include "tsdbFS2.h" +#include "tsdbFSetRW.h" #include "tsdbIter.h" #include "tsdbSttFileRW.h" @@ -216,6 +217,8 @@ static int32_t tsdbSnapReadFileSetBegin(STsdbSnapReader* reader) { int32_t code = 0; int32_t lino = 0; + ASSERT(reader->ctx->fset == NULL); + if (reader->ctx->fsetArrIdx < TARRAY2_SIZE(reader->fsetArr)) { reader->ctx->fset = TARRAY2_GET(reader->fsetArr, reader->ctx->fsetArrIdx++); reader->ctx->isDataDone = false; @@ -387,7 +390,6 @@ int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, int32_t code = 0; int32_t lino = 0; - // alloc reader[0] = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*reader[0])); if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; @@ -510,19 +512,21 @@ struct STsdbSnapWriter { int32_t maxRow; int8_t cmprAlg; int64_t commitID; + int32_t szPage; + int64_t compactVersion; + int64_t now; uint8_t* aBuf[5]; TFileSetArray* fsetArr; TFileOpArray fopArr[1]; struct { - bool fsetWriteBegin; - + bool fsetWriteBegin; int32_t fid; STFileSet* fset; - - bool hasData; - bool hasTomb; + SDiskID did; + bool hasData; + bool hasTomb; // reader SDataFileReader* dataReader; @@ -533,840 +537,466 @@ struct STsdbSnapWriter { SIterMerger* dataIterMerger; TTsdbIterArray tombIterArr[1]; SIterMerger* tombIterMerger; - } ctx[1]; - SDataFileWriter* dataWriter; - SSttFileWriter* sttWriter; - - SBlockData blockData[1]; - STombBlock tombBlock[1]; + // writer + SFSetWriter* fsetWriter; + } ctx[1]; }; -#if 0 -// SNAP_DATA_TSDB -static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) { +// APIs +static int32_t tsdbSnapWriteTimeSeriesRow(STsdbSnapWriter* writer, SRowInfo* row) { int32_t code = 0; int32_t lino = 0; - if (pId) { - pWriter->tbid = *pId; - } else { - pWriter->tbid = (TABLEID){INT64_MAX, INT64_MAX}; - } - - if (pWriter->pDIter) { - STsdbDataIter2* pIter = pWriter->pDIter; - - // assert last table data end - ASSERT(pIter->dIter.iRow >= pIter->dIter.bData.nRow); - ASSERT(pIter->dIter.iDataBlk >= pIter->dIter.mDataBlk.nItem); - - for (;;) { - if (pIter->dIter.iBlockIdx >= taosArrayGetSize(pIter->dIter.aBlockIdx)) { - pWriter->pDIter = NULL; - break; - } - - SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx); - - int32_t c = tTABLEIDCmprFn(pBlockIdx, &pWriter->tbid); - if (c < 0) { - code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - SBlockIdx* pNewBlockIdx = taosArrayReserve(pWriter->aBlockIdx, 1); - if (pNewBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - pNewBlockIdx->suid = pBlockIdx->suid; - pNewBlockIdx->uid = pBlockIdx->uid; - - code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pIter->dIter.mDataBlk, pNewBlockIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - pIter->dIter.iBlockIdx++; - } else if (c == 0) { - code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - pIter->dIter.iDataBlk = 0; - pIter->dIter.iBlockIdx++; - - break; - } else { - pIter->dIter.iDataBlk = pIter->dIter.mDataBlk.nItem; - break; - } + while (writer->ctx->hasData) { + SRowInfo* row1 = tsdbIterMergerGetData(writer->ctx->dataIterMerger); + if (row1 == NULL) { + writer->ctx->hasData = false; + break; } - } - - if (pId) { - code = tsdbUpdateTableSchema(pWriter->tsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable); - TSDB_CHECK_CODE(code, lino, _exit); - - tMapDataReset(&pWriter->mDataBlk); - code = tBlockDataInit(&pWriter->bData, pId, pWriter->skmTable.pTSchema, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (!TABLE_SAME_SCHEMA(pWriter->tbid.suid, pWriter->tbid.uid, pWriter->sData.suid, pWriter->sData.uid)) { - if ((pWriter->sData.nRow > 0)) { - code = tsdbWriteSttBlock(pWriter->pDataFWriter, &pWriter->sData, pWriter->aSttBlk, pWriter->cmprAlg); + int32_t c = tRowInfoCmprFn(row1, row); + if (c <= 0) { + code = tsdbFSetWriteRow(writer->ctx->fsetWriter, row1); TSDB_CHECK_CODE(code, lino, _exit); - } - if (pId) { - TABLEID id = {.suid = pWriter->tbid.suid, .uid = pWriter->tbid.suid ? 0 : pWriter->tbid.uid}; - code = tBlockDataInit(&pWriter->sData, &id, pWriter->skmTable.pTSchema, NULL, 0); + code = tsdbIterMergerNext(writer->ctx->dataIterMerger); TSDB_CHECK_CODE(code, lino, _exit); + } else { + break; } } -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->tsdb->pVnode), __func__, - pWriter->tbid.suid, pWriter->tbid.uid); + if (row->suid == INT64_MAX) { + ASSERT(writer->ctx->hasData == false); + goto _exit; } - return code; -} - -static int32_t tsdbSnapWriteTableRowImpl(STsdbSnapWriter* pWriter, TSDBROW* pRow) { - int32_t code = 0; - int32_t lino = 0; - code = tBlockDataAppendRow(&pWriter->bData, pRow, pWriter->skmTable.pTSchema, pWriter->tbid.uid); + code = tsdbFSetWriteRow(writer->ctx->fsetWriter, row); TSDB_CHECK_CODE(code, lino, _exit); - if (pWriter->bData.nRow >= pWriter->maxRow) { - code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); } return code; } -static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) { +static int32_t tsdbSnapWriteFileSetOpenReader(STsdbSnapWriter* writer) { int32_t code = 0; int32_t lino = 0; - TSDBKEY inKey = pRow ? TSDBROW_KEY(pRow) : TSDBKEY_MAX; + ASSERT(writer->ctx->dataReader == NULL); + ASSERT(TARRAY2_SIZE(writer->ctx->sttReaderArr) == 0); - if (pWriter->pDIter == NULL || (pWriter->pDIter->dIter.iRow >= pWriter->pDIter->dIter.bData.nRow && - pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem)) { - goto _write_row; - } else { - for (;;) { - while (pWriter->pDIter->dIter.iRow < pWriter->pDIter->dIter.bData.nRow) { - TSDBROW row = tsdbRowFromBlockData(&pWriter->pDIter->dIter.bData, pWriter->pDIter->dIter.iRow); - - int32_t c = tsdbKeyCmprFn(&inKey, &TSDBROW_KEY(&row)); - if (c < 0) { - goto _write_row; - } else if (c > 0) { - code = tsdbSnapWriteTableRowImpl(pWriter, &row); - TSDB_CHECK_CODE(code, lino, _exit); - - pWriter->pDIter->dIter.iRow++; - } else { - ASSERT(0); - } - } + if (writer->ctx->fset) { + // open data reader + SDataFileReaderConfig dataFileReaderConfig = { + .tsdb = writer->tsdb, + .bufArr = writer->aBuf, + .szPage = writer->szPage, + }; - for (;;) { - if (pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem) goto _write_row; - - // FIXME: Here can be slow, use array instead - SDataBlk dataBlk; - tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk, &dataBlk, tGetDataBlk); - - int32_t c = tDataBlkCmprFn(&dataBlk, &(SDataBlk){.minKey = inKey, .maxKey = inKey}); - if (c > 0) { - goto _write_row; - } else if (c < 0) { - if (pWriter->bData.nRow > 0) { - code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - - tMapDataPutItem(&pWriter->mDataBlk, &dataBlk, tPutDataBlk); - pWriter->pDIter->dIter.iDataBlk++; - } else { - code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData); - TSDB_CHECK_CODE(code, lino, _exit); - - pWriter->pDIter->dIter.iRow = 0; - pWriter->pDIter->dIter.iDataBlk++; - break; - } + for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) { + if (writer->ctx->fset->farr[ftype] == NULL) { + continue; } + + dataFileReaderConfig.files[ftype].exist = true; + dataFileReaderConfig.files[ftype].file = writer->ctx->fset->farr[ftype]->f[0]; } - } -_write_row: - if (pRow) { - code = tsdbSnapWriteTableRowImpl(pWriter, pRow); + code = tsdbDataFileReaderOpen(NULL, &dataFileReaderConfig, &writer->ctx->dataReader); TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { - int32_t code = 0; - int32_t lino = 0; - - // write a NULL row to end current table data write - code = tsdbSnapWriteTableRow(pWriter, NULL); - TSDB_CHECK_CODE(code, lino, _exit); - if (pWriter->bData.nRow > 0) { - if (pWriter->bData.nRow < pWriter->minRow) { - ASSERT(TABLE_SAME_SCHEMA(pWriter->sData.suid, pWriter->sData.uid, pWriter->tbid.suid, pWriter->tbid.uid)); - for (int32_t iRow = 0; iRow < pWriter->bData.nRow; iRow++) { - code = - tBlockDataAppendRow(&pWriter->sData, &tsdbRowFromBlockData(&pWriter->bData, iRow), NULL, pWriter->tbid.uid); + // open stt reader array + SSttLvl* lvl; + TARRAY2_FOREACH(writer->ctx->fset->lvlArr, lvl) { + STFileObj* fobj; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + SSttFileReader* reader; + SSttFileReaderConfig sttFileReaderConfig = { + .tsdb = writer->tsdb, + .szPage = writer->szPage, + .bufArr = writer->aBuf, + .file = fobj->f[0], + }; + + code = tsdbSttFileReaderOpen(fobj->fname, &sttFileReaderConfig, &reader); TSDB_CHECK_CODE(code, lino, _exit); - if (pWriter->sData.nRow >= pWriter->maxRow) { - code = tsdbWriteSttBlock(pWriter->pDataFWriter, &pWriter->sData, pWriter->aSttBlk, pWriter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = TARRAY2_APPEND(writer->ctx->sttReaderArr, reader); + TSDB_CHECK_CODE(code, lino, _exit); } - - tBlockDataClear(&pWriter->bData); - } else { - code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - if (pWriter->mDataBlk.nItem) { - SBlockIdx* pBlockIdx = taosArrayReserve(pWriter->aBlockIdx, 1); - if (pBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); } - - pBlockIdx->suid = pWriter->tbid.suid; - pBlockIdx->uid = pWriter->tbid.uid; - - code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pWriter->mDataBlk, pBlockIdx); - TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); } return code; } -static int32_t tsdbSnapWriteFileDataStart(STsdbSnapWriter* pWriter, int32_t fid) { +static int32_t tsdbSnapWriteFileSetCloseReader(STsdbSnapWriter* writer) { + TARRAY2_CLEAR(writer->ctx->sttReaderArr, tsdbSttFileReaderClose); + tsdbDataFileReaderClose(&writer->ctx->dataReader); + return 0; +} + +static int32_t tsdbSnapWriteFileSetOpenIter(STsdbSnapWriter* writer) { int32_t code = 0; int32_t lino = 0; - ASSERT(pWriter->pDataFWriter == NULL && pWriter->fid < fid); + // data ieter + if (writer->ctx->dataReader) { + STsdbIter* iter; + STsdbIterConfig config = {0}; - STsdb* pTsdb = pWriter->tsdb; - - pWriter->fid = fid; - pWriter->tbid = (TABLEID){0}; - SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); + // data + config.type = TSDB_ITER_TYPE_DATA; + config.dataReader = writer->ctx->dataReader; - // open reader - pWriter->pDataFReader = NULL; - pWriter->iterList = NULL; - pWriter->pDIter = NULL; - pWriter->pSIter = NULL; - tRBTreeCreate(&pWriter->rbt, tsdbDataIterCmprFn); - if (pSet) { - code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet); + code = tsdbIterOpen(&config, &iter); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbOpenDataFileDataIter(pWriter->pDataFReader, &pWriter->pDIter); + code = TARRAY2_APPEND(writer->ctx->dataIterArr, iter); TSDB_CHECK_CODE(code, lino, _exit); - if (pWriter->pDIter) { - pWriter->pDIter->next = pWriter->iterList; - pWriter->iterList = pWriter->pDIter; - } - for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { - code = tsdbOpenSttFileDataIter(pWriter->pDataFReader, iStt, &pWriter->pSIter); - TSDB_CHECK_CODE(code, lino, _exit); + // tome + config.type = TSDB_ITER_TYPE_DATA_TOMB; + config.dataReader = writer->ctx->dataReader; - if (pWriter->pSIter) { - code = tsdbDataIterNext2(pWriter->pSIter, NULL); - TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbIterOpen(&config, &iter); + TSDB_CHECK_CODE(code, lino, _exit); - // add to tree - tRBTreePut(&pWriter->rbt, &pWriter->pSIter->rbtn); + code = TARRAY2_APPEND(writer->ctx->tombIterArr, iter); + TSDB_CHECK_CODE(code, lino, _exit); + } - // add to list - pWriter->pSIter->next = pWriter->iterList; - pWriter->iterList = pWriter->pSIter; - } - } + // stt iter + SSttFileReader* sttFileReader; + TARRAY2_FOREACH(writer->ctx->sttReaderArr, sttFileReader) { + STsdbIter* iter; + STsdbIterConfig config = {0}; - pWriter->pSIter = NULL; - } + // data + config.type = TSDB_ITER_TYPE_STT; + config.sttReader = sttFileReader; - // open writer - SDiskID diskId; - if (pSet) { - diskId = pSet->diskId; - } else { - tfsAllocDisk(pTsdb->pVnode->pTfs, 0 /*TODO*/, &diskId); - tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, diskId); - } - SDFileSet wSet = {.diskId = diskId, - .fid = fid, - .pHeadF = &(SHeadFile){.commitID = pWriter->commitID}, - .pDataF = (pSet) ? pSet->pDataF : &(SDataFile){.commitID = pWriter->commitID}, - .pSmaF = (pSet) ? pSet->pSmaF : &(SSmaFile){.commitID = pWriter->commitID}, - .nSttF = 1, - .aSttF = {&(SSttFile){.commitID = pWriter->commitID}}}; - code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet); - TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbIterOpen(&config, &iter); + TSDB_CHECK_CODE(code, lino, _exit); - if (pWriter->aBlockIdx) { - taosArrayClear(pWriter->aBlockIdx); - } else if ((pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = TARRAY2_APPEND(writer->ctx->dataIterArr, iter); TSDB_CHECK_CODE(code, lino, _exit); - } - tMapDataReset(&pWriter->mDataBlk); + // tomb + config.type = TSDB_ITER_TYPE_STT_TOMB; + config.sttReader = sttFileReader; - if (pWriter->aSttBlk) { - taosArrayClear(pWriter->aSttBlk); - } else if ((pWriter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = tsdbIterOpen(&config, &iter); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(writer->ctx->tombIterArr, iter); TSDB_CHECK_CODE(code, lino, _exit); } - tBlockDataReset(&pWriter->bData); - tBlockDataReset(&pWriter->sData); + // open merger + code = tsdbIterMergerOpen(writer->ctx->dataIterArr, &writer->ctx->dataIterMerger, false); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbIterMergerOpen(writer->ctx->tombIterArr, &writer->ctx->dataIterMerger, true); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code), - fid); - } else { - tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(pTsdb->pVnode), __func__, fid); + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); } return code; } -static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowInfo) { +static int32_t tsdbSnapWriteFileSetCloseIter(STsdbSnapWriter* writer) { + tsdbIterMergerClose(&writer->ctx->dataIterMerger); + tsdbIterMergerClose(&writer->ctx->tombIterMerger); + TARRAY2_CLEAR(writer->ctx->dataIterArr, tsdbIterClose); + TARRAY2_CLEAR(writer->ctx->tombIterArr, tsdbIterClose); + return 0; +} + +static int32_t tsdbSnapWriteFileSetOpenWriter(STsdbSnapWriter* writer) { int32_t code = 0; int32_t lino = 0; - // switch to new table if need - if (pRowInfo == NULL || pRowInfo->uid != pWriter->tbid.uid) { - if (pWriter->tbid.uid) { - code = tsdbSnapWriteTableDataEnd(pWriter); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbSnapWriteTableDataStart(pWriter, (TABLEID*)pRowInfo); - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (pRowInfo == NULL) goto _exit; + SFSetWriterConfig config = { + .tsdb = writer->tsdb, + .toSttOnly = false, + .compactVersion = writer->compactVersion, + .minRow = writer->minRow, + .maxRow = writer->maxRow, + .szPage = writer->szPage, + .cmprAlg = writer->cmprAlg, + .fid = writer->ctx->fid, + .cid = writer->commitID, + .did = writer->ctx->did, + .level = 0, + }; - code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row); + code = tsdbFSetWriterOpen(&config, &writer->ctx->fsetWriter); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); } return code; } -static int32_t tsdbSnapWriteNextRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInfo) { +static int32_t tsdbSnapWriteFileSetCloseWriter(STsdbSnapWriter* writer) { + return tsdbFSetWriterClose(&writer->ctx->fsetWriter, 0, writer->fopArr); +} + +static int32_t tsdbSnapWriteFileSetBegin(STsdbSnapWriter* writer, int32_t fid) { int32_t code = 0; int32_t lino = 0; - if (pWriter->pSIter) { - code = tsdbDataIterNext2(pWriter->pSIter, NULL); - TSDB_CHECK_CODE(code, lino, _exit); + ASSERT(writer->ctx->fsetWriteBegin == false); - if (pWriter->pSIter->rowInfo.suid == 0 && pWriter->pSIter->rowInfo.uid == 0) { - pWriter->pSIter = NULL; - } else { - SRBTreeNode* pNode = tRBTreeMin(&pWriter->rbt); - if (pNode) { - int32_t c = tsdbDataIterCmprFn(&pWriter->pSIter->rbtn, pNode); - if (c > 0) { - tRBTreePut(&pWriter->rbt, &pWriter->pSIter->rbtn); - pWriter->pSIter = NULL; - } else if (c == 0) { - ASSERT(0); - } - } - } - } + STFileSet* fset = &(STFileSet){.fid = fid}; - if (pWriter->pSIter == NULL) { - SRBTreeNode* pNode = tRBTreeMin(&pWriter->rbt); - if (pNode) { - tRBTreeDrop(&pWriter->rbt, pNode); - pWriter->pSIter = TSDB_RBTN_TO_DATA_ITER(pNode); - } - } + writer->ctx->fid = fid; + writer->ctx->fset = TARRAY2_SEARCH_EX(writer->fsetArr, &fset, tsdbTFileSetCmprFn, TD_EQ); - if (ppRowInfo) { - if (pWriter->pSIter) { - *ppRowInfo = &pWriter->pSIter->rowInfo; - } else { - *ppRowInfo = NULL; - } + int32_t level = tsdbFidLevel(fid, &writer->tsdb->keepCfg, writer->now); + if (tfsAllocDisk(writer->tsdb->pVnode->pTfs, level, &writer->ctx->did)) { + code = TSDB_CODE_NO_AVAIL_DISK; + TSDB_CHECK_CODE(code, lino, _exit); } + writer->ctx->hasData = true; + writer->ctx->hasTomb = true; + + code = tsdbSnapWriteFileSetOpenReader(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbSnapWriteFileSetOpenIter(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbSnapWriteFileSetOpenWriter(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + writer->ctx->fsetWriteBegin = true; + _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); } return code; } -static int32_t tsdbSnapWriteGetRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInfo) { +static int32_t tsdbSnapWriteTombRecord(STsdbSnapWriter* writer, const STombRecord* record) { int32_t code = 0; int32_t lino = 0; - if (pWriter->pSIter) { - *ppRowInfo = &pWriter->pSIter->rowInfo; + while (writer->ctx->hasTomb) { + STombRecord* record1 = tsdbIterMergerGetTombRecord(writer->ctx->tombIterMerger); + if (record1 == NULL) { + writer->ctx->hasTomb = false; + break; + } + + int32_t c = tTombRecordCompare(record1, record); + if (c <= 0) { + code = tsdbFSetWriteTombRecord(writer->ctx->fsetWriter, record1); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + break; + } + } + + if (record->suid == INT64_MAX) { + ASSERT(writer->ctx->hasTomb == false); goto _exit; } - code = tsdbSnapWriteNextRow(pWriter, ppRowInfo); + code = tsdbFSetWriteTombRecord(writer->ctx->fsetWriter, record); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); } return code; } -static int32_t tsdbSnapWriteFileDataEnd(STsdbSnapWriter* pWriter) { +static int32_t tsdbSnapWriteFileSetEnd(STsdbSnapWriter* writer) { + if (!writer->ctx->fsetWriteBegin) return 0; + int32_t code = 0; int32_t lino = 0; - ASSERT(pWriter->pDataFWriter); + SRowInfo row = { + .suid = INT64_MAX, + .uid = INT64_MAX, + }; - // consume remain data and end with a NULL table row - SRowInfo* pRowInfo; - code = tsdbSnapWriteGetRow(pWriter, &pRowInfo); + code = tsdbSnapWriteTimeSeriesRow(writer, &row); TSDB_CHECK_CODE(code, lino, _exit); - for (;;) { - code = tsdbSnapWriteTableData(pWriter, pRowInfo); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pRowInfo == NULL) break; - - code = tsdbSnapWriteNextRow(pWriter, &pRowInfo); - TSDB_CHECK_CODE(code, lino, _exit); - } - // do file-level updates - code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aSttBlk); - TSDB_CHECK_CODE(code, lino, _exit); + STombRecord record = { + .suid = INT64_MAX, + .uid = INT64_MAX, + }; - code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdx); + code = tsdbSnapWriteTombRecord(writer, &record); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbUpdateDFileSetHeader(pWriter->pDataFWriter); + // close write + code = tsdbSnapWriteFileSetCloseWriter(writer); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet); + code = tsdbSnapWriteFileSetCloseIter(writer); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1); + code = tsdbSnapWriteFileSetCloseReader(writer); TSDB_CHECK_CODE(code, lino, _exit); - if (pWriter->pDataFReader) { - code = tsdbDataFReaderClose(&pWriter->pDataFReader); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // clear sources - while (pWriter->iterList) { - STsdbDataIter2* pIter = pWriter->iterList; - pWriter->iterList = pIter->next; - tsdbCloseDataIter2(pIter); - } + writer->ctx->fsetWriteBegin = false; _exit: if (code) { - tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->tsdb->pVnode), __func__, tstrerror(code)); - } else { - tsdbDebug("vgId:%d %s is done", TD_VID(pWriter->tsdb->pVnode), __func__); + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); } return code; } -static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) { +static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) { int32_t code = 0; int32_t lino = 0; - code = tDecmprBlockData(pHdr->data, pHdr->size, &pWriter->inData, pWriter->aBuf); - TSDB_CHECK_CODE(code, lino, _exit); + SBlockData blockData[1] = {0}; - ASSERT(pWriter->inData.nRow > 0); + code = tDecmprBlockData(hdr->data, hdr->size, blockData, writer->aBuf); + TSDB_CHECK_CODE(code, lino, _exit); - // switch to new data file if need - int32_t fid = tsdbKeyFid(pWriter->inData.aTSKEY[0], pWriter->minutes, pWriter->precision); - if (pWriter->fid != fid) { - if (pWriter->pDataFWriter) { - code = tsdbSnapWriteFileDataEnd(pWriter); - TSDB_CHECK_CODE(code, lino, _exit); - } + int32_t fid = tsdbKeyFid(blockData->aTSKEY[0], writer->minutes, writer->precision); + if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) { + code = tsdbSnapWriteFileSetEnd(writer); + TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbSnapWriteFileDataStart(pWriter, fid); + code = tsdbSnapWriteFileSetBegin(writer, fid); TSDB_CHECK_CODE(code, lino, _exit); } - // loop write each row - SRowInfo* pRowInfo; - code = tsdbSnapWriteGetRow(pWriter, &pRowInfo); - TSDB_CHECK_CODE(code, lino, _exit); - for (int32_t iRow = 0; iRow < pWriter->inData.nRow; ++iRow) { - SRowInfo rInfo = {.suid = pWriter->inData.suid, - .uid = pWriter->inData.uid ? pWriter->inData.uid : pWriter->inData.aUid[iRow], - .row = tsdbRowFromBlockData(&pWriter->inData, iRow)}; - - for (;;) { - if (pRowInfo == NULL) { - code = tsdbSnapWriteTableData(pWriter, &rInfo); - TSDB_CHECK_CODE(code, lino, _exit); - break; - } else { - int32_t c = tRowInfoCmprFn(&rInfo, pRowInfo); - if (c < 0) { - code = tsdbSnapWriteTableData(pWriter, &rInfo); - TSDB_CHECK_CODE(code, lino, _exit); - break; - } else if (c > 0) { - code = tsdbSnapWriteTableData(pWriter, pRowInfo); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbSnapWriteNextRow(pWriter, &pRowInfo); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - ASSERT(0); - } - } - } + for (int32_t i = 0; i < blockData->nRow; ++i) { + SRowInfo rowInfo = { + .suid = blockData->suid, + .uid = blockData->uid ? blockData->uid : blockData->aUid[i], + .row = tsdbRowFromBlockData(blockData, i), + }; + + code = tsdbSnapWriteTimeSeriesRow(writer, &rowInfo); + TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); } else { - tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(pWriter->tsdb->pVnode), __func__, - pWriter->inData.suid, pWriter->inData.uid, pWriter->inData.nRow); + tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(writer->tsdb->pVnode), __func__, + blockData->suid, blockData->uid, blockData->nRow); } + tBlockDataDestroy(blockData); return code; } -// SNAP_DATA_DEL -static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) { +static int32_t tsdbSnapWriteDecmprTombBlock(SSnapDataHdr* hdr, STombBlock* tombBlock) { int32_t code = 0; int32_t lino = 0; - if (pId) { - pWriter->tbid = *pId; - } else { - pWriter->tbid = (TABLEID){.suid = INT64_MAX, .uid = INT64_MAX}; - } + // TODO - taosArrayClear(pWriter->aDelData); + ASSERT(0); - if (pWriter->pTIter) { - while (pWriter->pTIter->tIter.iDelIdx < taosArrayGetSize(pWriter->pTIter->tIter.aDelIdx)) { - SDelIdx* pDelIdx = taosArrayGet(pWriter->pTIter->tIter.aDelIdx, pWriter->pTIter->tIter.iDelIdx); +_exit: + return code; +} - int32_t c = tTABLEIDCmprFn(pDelIdx, &pWriter->tbid); - if (c < 0) { - code = tsdbReadDelDatav1(pWriter->pDelFReader, pDelIdx, pWriter->pTIter->tIter.aDelData, INT64_MAX); - TSDB_CHECK_CODE(code, lino, _exit); +static int32_t tsdbSnapWriteTombData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) { + int32_t code = 0; + int32_t lino = 0; - SDelIdx* pDelIdxNew = taosArrayReserve(pWriter->aDelIdx, 1); - if (pDelIdxNew == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } + STombRecord record; + STombBlock tombBlock[1] = {0}; - pDelIdxNew->suid = pDelIdx->suid; - pDelIdxNew->uid = pDelIdx->uid; + code = tsdbSnapWriteDecmprTombBlock(hdr, tombBlock); + TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->pTIter->tIter.aDelData, pDelIdxNew); - TSDB_CHECK_CODE(code, lino, _exit); + tTombBlockGet(tombBlock, 0, &record); + int32_t fid = tsdbKeyFid(record.skey, writer->minutes, writer->precision); + if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) { + code = tsdbSnapWriteFileSetEnd(writer); + TSDB_CHECK_CODE(code, lino, _exit); - pWriter->pTIter->tIter.iDelIdx++; - } else if (c == 0) { - code = tsdbReadDelDatav1(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, INT64_MAX); - TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbSnapWriteFileSetBegin(writer, fid); + TSDB_CHECK_CODE(code, lino, _exit); + } - pWriter->pTIter->tIter.iDelIdx++; - break; - } else { - break; - } - } - } - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->tsdb->pVnode), __func__, - pWriter->tbid.suid, pWriter->tbid.uid); - } - return code; -} - -static int32_t tsdbSnapWriteDelTableDataEnd(STsdbSnapWriter* pWriter) { - int32_t code = 0; - int32_t lino = 0; - - if (taosArrayGetSize(pWriter->aDelData) > 0) { - SDelIdx* pDelIdx = taosArrayReserve(pWriter->aDelIdx, 1); - if (pDelIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - pDelIdx->suid = pWriter->tbid.suid; - pDelIdx->uid = pWriter->tbid.uid; - - code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, pDelIdx); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbTrace("vgId:%d %s done", TD_VID(pWriter->tsdb->pVnode), __func__); - } - return code; -} - -static int32_t tsdbSnapWriteDelTableData(STsdbSnapWriter* pWriter, TABLEID* pId, uint8_t* pData, int64_t size) { - int32_t code = 0; - int32_t lino = 0; - - if (pId == NULL || pId->uid != pWriter->tbid.uid) { - if (pWriter->tbid.uid) { - code = tsdbSnapWriteDelTableDataEnd(pWriter); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbSnapWriteDelTableDataStart(pWriter, pId); - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (pId == NULL) goto _exit; - - int64_t n = 0; - while (n < size) { - SDelData delData; - n += tGetDelData(pData + n, &delData); - - if (taosArrayPush(pWriter->aDelData, &delData) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - } - ASSERT(n == size); - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tsdbSnapWriteDelDataStart(STsdbSnapWriter* pWriter) { - int32_t code = 0; - int32_t lino = 0; - - STsdb* pTsdb = pWriter->tsdb; - SDelFile* pDelFile = pWriter->fs.pDelFile; - - pWriter->tbid = (TABLEID){0}; - - // reader - if (pDelFile) { - code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbOpenTombFileDataIter(pWriter->pDelFReader, &pWriter->pTIter); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // writer - code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &(SDelFile){.commitID = pWriter->commitID}, pTsdb); - TSDB_CHECK_CODE(code, lino, _exit); - - if ((pWriter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - if ((pWriter->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); - } - return code; -} - -static int32_t tsdbSnapWriteDelDataEnd(STsdbSnapWriter* pWriter) { - int32_t code = 0; - int32_t lino = 0; - - STsdb* pTsdb = pWriter->tsdb; - - // end remaining table with NULL data - code = tsdbSnapWriteDelTableData(pWriter, NULL, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - - // update file-level info - code = tsdbWriteDelIdx(pWriter->pDelFWriter, pWriter->aDelIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbFSUpsertDelFile(&pWriter->fs, &pWriter->pDelFWriter->fDel); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1); - TSDB_CHECK_CODE(code, lino, _exit); + if (writer->ctx->hasData) { + SRowInfo row = { + .suid = INT64_MAX, + .uid = INT64_MAX, + }; - if (pWriter->pDelFReader) { - code = tsdbDelFReaderClose(&pWriter->pDelFReader); + code = tsdbSnapWriteTimeSeriesRow(writer, &row); TSDB_CHECK_CODE(code, lino, _exit); } - if (pWriter->pTIter) { - tsdbCloseDataIter2(pWriter->pTIter); - pWriter->pTIter = NULL; - } + ASSERT(writer->ctx->hasData == false); -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); - } - return code; -} - -static int32_t tsdbSnapWriteDelData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) { - int32_t code = 0; - int32_t lino = 0; - - STsdb* pTsdb = pWriter->tsdb; + for (int32_t i = 0; i < TOMB_BLOCK_SIZE(tombBlock); ++i) { + tTombBlockGet(tombBlock, i, &record); - // start to write del data if need - if (pWriter->pDelFWriter == NULL) { - code = tsdbSnapWriteDelDataStart(pWriter); + code = tsdbSnapWriteTombRecord(writer, &record); TSDB_CHECK_CODE(code, lino, _exit); } - // do write del data - code = tsdbSnapWriteDelTableData(pWriter, (TABLEID*)pHdr->data, pHdr->data + sizeof(TABLEID), - pHdr->size - sizeof(TABLEID)); - TSDB_CHECK_CODE(code, lino, _exit); + tTombBlockDestroy(tombBlock); _exit: if (code) { - tsdbError("vgId:%d %s failed since %s", TD_VID(pTsdb->pVnode), __func__, tstrerror(code)); - } else { - tsdbTrace("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); } return code; } -#endif -// APIs -int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) { +int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** writer) { int32_t code = 0; int32_t lino = 0; -#if 0 - // alloc - STsdbSnapWriter* pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); - if (pWriter == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - pWriter->tsdb = pTsdb; - pWriter->sver = sver; - pWriter->ever = ever; - pWriter->minutes = pTsdb->keepCfg.days; - pWriter->precision = pTsdb->keepCfg.precision; - pWriter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; - pWriter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; - pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; - pWriter->commitID = pTsdb->pVnode->state.commitID; - - code = tsdbFSCopy(pTsdb, &pWriter->fs); - TSDB_CHECK_CODE(code, lino, _exit); - - // SNAP_DATA_TSDB - code = tBlockDataCreate(&pWriter->inData); + writer[0] = taosMemoryCalloc(1, sizeof(*writer[0])); + if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + writer[0]->tsdb = pTsdb; + writer[0]->sver = sver; + writer[0]->ever = ever; + writer[0]->minutes = pTsdb->keepCfg.days; + writer[0]->precision = pTsdb->keepCfg.precision; + writer[0]->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; + writer[0]->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; + writer[0]->commitID = tsdbFSAllocEid(pTsdb->pFS); + writer[0]->szPage = pTsdb->pVnode->config.tsdbPageSize; + writer[0]->compactVersion = INT64_MAX; + writer[0]->now = taosGetTimestampMs(); + + code = tsdbFSCreateCopySnapshot(pTsdb->pFS, &writer[0]->fsetArr); TSDB_CHECK_CODE(code, lino, _exit); - pWriter->fid = INT32_MIN; - - code = tBlockDataCreate(&pWriter->bData); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tBlockDataCreate(&pWriter->sData); - TSDB_CHECK_CODE(code, lino, _exit); - - // SNAP_DATA_DEL -#endif - _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - // if (pWriter) { - // tBlockDataDestroy(&pWriter->sData); - // tBlockDataDestroy(&pWriter->bData); - // tBlockDataDestroy(&pWriter->inData); - // tsdbFSDestroy(&pWriter->fs); - // taosMemoryFree(pWriter); - // pWriter = NULL; - // } } else { tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, sver, ever); } - // *ppWriter = pWriter; return code; } @@ -1374,24 +1004,15 @@ int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* writer) { int32_t code = 0; int32_t lino = 0; -#if 0 - if (pWriter->pDataFWriter) { - code = tsdbSnapWriteFileDataEnd(pWriter); - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (pWriter->pDelFWriter) { - code = tsdbSnapWriteDelDataEnd(pWriter); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = tsdbSnapWriteFileSetEnd(writer); + TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbFSPrepareCommit(pWriter->tsdb, &pWriter->fs); + code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT); TSDB_CHECK_CODE(code, lino, _exit); -#endif _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->tsdb->pVnode), __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); } else { tsdbDebug("vgId:%d %s done", TD_VID(writer->tsdb->pVnode), __func__); } @@ -1399,245 +1020,50 @@ _exit: } int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) { + if (writer[0] == NULL) return 0; + int32_t code = 0; int32_t lino = 0; -#if 0 - STsdbSnapWriter* pWriter = *writer; - STsdb* pTsdb = pWriter->tsdb; + STsdb* tsdb = writer[0]->tsdb; if (rollback) { - tsdbRollbackCommit(pWriter->tsdb); + code = tsdbFSEditAbort(writer[0]->tsdb->pFS); + TSDB_CHECK_CODE(code, lino, _exit); } else { - // lock - taosThreadRwlockWrlock(&pTsdb->rwLock); + taosThreadRwlockWrlock(&writer[0]->tsdb->rwLock); - code = tsdbFSCommit(pWriter->tsdb); + code = tsdbFSEditCommit(writer[0]->tsdb->pFS); if (code) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // unlock - taosThreadRwlockUnlock(&pTsdb->rwLock); - } - - // SNAP_DATA_DEL - taosArrayDestroy(pWriter->aDelData); - taosArrayDestroy(pWriter->aDelIdx); - - // SNAP_DATA_TSDB - tBlockDataDestroy(&pWriter->sData); - tBlockDataDestroy(&pWriter->bData); - taosArrayDestroy(pWriter->aSttBlk); - tMapDataClear(&pWriter->mDataBlk); - taosArrayDestroy(pWriter->aBlockIdx); - tDestroyTSchema(pWriter->skmTable.pTSchema); - tBlockDataDestroy(&pWriter->inData); - - for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) { - tFree(pWriter->aBuf[iBuf]); - } - tsdbFSDestroy(&pWriter->fs); - taosMemoryFree(pWriter); - *writer = NULL; -#endif - -_exit: - if (code) { - // tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - // tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); - } - return code; -} - -static int32_t tsdbSnapWriteDoWriteTimeSeriesRow(STsdbSnapWriter* writer, const SRowInfo* row) { - int32_t code = 0; - int32_t lino = 0; - - // TODO - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); - } - return code; -} - -static int32_t tsdbSnapWriteTimeSeriesRow(STsdbSnapWriter* writer, const SRowInfo* row) { - int32_t code = 0; - int32_t lino = 0; - - while (writer->ctx->hasData) { - SRowInfo* row1 = tsdbIterMergerGetData(writer->ctx->dataIterMerger); - if (row1 == NULL) { - writer->ctx->hasData = false; - break; - } - - int32_t c = tRowInfoCmprFn(row1, row); - if (c <= 0) { - code = tsdbSnapWriteDoWriteTimeSeriesRow(writer, row1); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbIterMergerNext(writer->ctx->dataIterMerger); + taosThreadRwlockUnlock(&writer[0]->tsdb->rwLock); TSDB_CHECK_CODE(code, lino, _exit); - } else { - break; } - } - if (row->suid == INT64_MAX) { - ASSERT(writer->ctx->hasData == false); - goto _exit; + taosThreadRwlockUnlock(&writer[0]->tsdb->rwLock); } - code = tsdbSnapWriteDoWriteTimeSeriesRow(writer, row); - TSDB_CHECK_CODE(code, lino, _exit); + tsdbIterMergerClose(&writer[0]->ctx->tombIterMerger); + tsdbIterMergerClose(&writer[0]->ctx->dataIterMerger); + TARRAY2_DESTROY(writer[0]->ctx->tombIterArr, tsdbIterClose); + TARRAY2_DESTROY(writer[0]->ctx->dataIterArr, tsdbIterClose); + TARRAY2_DESTROY(writer[0]->ctx->sttReaderArr, tsdbSttFileReaderClose); + tsdbDataFileReaderClose(&writer[0]->ctx->dataReader); -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); - } - return code; -} + TARRAY2_DESTROY(writer[0]->fopArr, NULL); + tsdbFSDestroyCopySnapshot(&writer[0]->fsetArr); -static int32_t tsdbSnapWriteFileSetBegin(STsdbSnapWriter* writer, int32_t fid) { - int32_t code = 0; - int32_t lino = 0; - - // TODO - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->aBuf); ++i) { + tFree(writer[0]->aBuf[i]); } - return code; -} - -static int32_t tsdbSnapWriteTombRecord(STsdbSnapWriter* writer, const STombRecord* record) { - int32_t code = 0; - int32_t lino = 0; - // TODO + taosMemoryFree(writer[0]); + writer[0] = NULL; _exit: if (code) { - TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); - } - return code; -} - -static int32_t tsdbSnapWriteFileSetEnd(STsdbSnapWriter* writer) { - if (!writer->ctx->fsetWriteBegin) return 0; - - int32_t code = 0; - int32_t lino = 0; - - // TODO - SRowInfo row = { - .suid = INT64_MAX, - .uid = INT64_MAX, - }; - - code = tsdbSnapWriteTimeSeriesRow(writer, &row); - TSDB_CHECK_CODE(code, lino, _exit); - - STombRecord record = { - .suid = INT64_MAX, - .uid = INT64_MAX, - }; - - code = tsdbSnapWriteTombRecord(writer, &record); - TSDB_CHECK_CODE(code, lino, _exit); - - // close write - code = tsdbSttFileWriterClose(&writer->sttWriter, 0, writer->fopArr); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbDataFileWriterClose(&writer->dataWriter, 0, writer->fopArr); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); - } - return code; -} - -static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) { - int32_t code = 0; - int32_t lino = 0; - - SBlockData blockData[1] = {0}; - - code = tDecmprBlockData(hdr->data, hdr->size, blockData, writer->aBuf); - TSDB_CHECK_CODE(code, lino, _exit); - - int32_t fid = tsdbKeyFid(blockData->aTSKEY[0], writer->minutes, writer->precision); - if (fid != writer->ctx->fid) { - code = tsdbSnapWriteFileSetEnd(writer); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbSnapWriteFileSetBegin(writer, fid); - TSDB_CHECK_CODE(code, lino, _exit); - } - - for (int32_t i = 0; i < blockData->nRow; ++i) { - SRowInfo rowInfo = { - .suid = blockData->suid, - .uid = blockData->uid ? blockData->uid : blockData->aUid[i], - .row = tsdbRowFromBlockData(blockData, i), - }; - - code = tsdbSnapWriteTimeSeriesRow(writer, &rowInfo); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } else { - tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(writer->tsdb->pVnode), __func__, - blockData->suid, blockData->uid, blockData->nRow); - } - tBlockDataDestroy(blockData); - return code; -} - -static int32_t tsdbSnapWriteDecmprTombBlock(SSnapDataHdr* hdr, STombBlock* tombBlock) { - int32_t code = 0; - int32_t lino = 0; - - // TODO - -_exit: - return code; -} - -static int32_t tsdbSnapWriteTombData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) { - int32_t code = 0; - int32_t lino = 0; - - STombBlock tombBlock[1] = {0}; - - code = tsdbSnapWriteDecmprTombBlock(hdr, tombBlock); - TSDB_CHECK_CODE(code, lino, _exit); - - for (int32_t i = 0; i < TOMB_BLOCK_SIZE(tombBlock); ++i) { - STombRecord record; - tTombBlockGet(tombBlock, i, &record); - - code = tsdbSnapWriteTombRecord(writer, &record); - TSDB_CHECK_CODE(code, lino, _exit); - } - - tTombBlockDestroy(tombBlock); - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + tsdbInfo("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__); } return code; }