diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 5a2e462c8cd69598ee8642c7109983c97c034294..2efb00ae32c510211a0ac0d6c44450fa1b48464f 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -202,6 +202,7 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol uint8_t **ppBuf); int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData, uint8_t **ppBuf); +int32_t tRowInfoCmprFn(const void *p1, const void *p2); // tsdbMemTable ============================================================================================== // SMemTable int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 75367883f1d82edf349bbccbee5552dcf299a003..2501af7f04203f895513cc22aacbe63d0fb64b8d 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -247,7 +247,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader); int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData); // STsdbSnapWriter ======================================== int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter); -int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData); +int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr); int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter); int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback); // STqSnapshotReader == diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index 34f884f9f95776be10f1416fdf50f99a8fafce6e..0a6fac0fe7c2d6c724855907e61f498a82abbd57 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -423,10 +423,10 @@ int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) // rsma1/rsma2 if (pHdr->type == SNAP_DATA_RSMA1) { pHdr->type = SNAP_DATA_TSDB; - code = tsdbSnapWrite(pWriter->pDataWriter[0], pData, nData); + code = tsdbSnapWrite(pWriter->pDataWriter[0], pHdr); } else if (pHdr->type == SNAP_DATA_RSMA2) { pHdr->type = SNAP_DATA_TSDB; - code = tsdbSnapWrite(pWriter->pDataWriter[1], pData, nData); + code = tsdbSnapWrite(pWriter->pDataWriter[1], pHdr); } else if (pHdr->type == SNAP_DATA_QTASK) { code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData); } else { diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 08d52554c6cf7452f201e745afe2a99c632dd77f..befaf4a3e0c2d3883e732e2717e28f3c9d59e1c4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -15,274 +15,628 @@ #include "tsdb.h" -// STsdbSnapReader ======================================== -typedef enum { SNAP_DATA_FILE_ITER = 0, SNAP_STT_FILE_ITER } EFIterT; +extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData); +extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo); +extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg); +extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg); + +// STsdbDataIter2 ======================================== +#define TSDB_MEM_TABLE_DATA_ITER 0 +#define TSDB_DATA_FILE_DATA_ITER 1 +#define TSDB_STT_FILE_DATA_ITER 2 +#define TSDB_TOMB_FILE_DATA_ITER 3 + +typedef struct STsdbDataIter2 STsdbDataIter2; +typedef struct STsdbFilterInfo STsdbFilterInfo; + typedef struct { - SRBTreeNode n; - SRowInfo rInfo; - EFIterT type; + int64_t suid; + int64_t uid; + SDelData delData; +} SDelInfo; + +struct STsdbDataIter2 { + STsdbDataIter2* next; + SRBTreeNode rbtn; + + int32_t type; + SRowInfo rowInfo; + SDelInfo delInfo; union { + // TSDB_MEM_TABLE_DATA_ITER struct { - SArray* aBlockIdx; - int32_t iBlockIdx; - SBlockIdx* pBlockIdx; - SMapData mBlock; - int32_t iBlock; - }; // .data file + SMemTable* pMemTable; + } mIter; + + // TSDB_DATA_FILE_DATA_ITER struct { - int32_t iStt; - SArray* aSttBlk; - int32_t iSttBlk; - }; // .stt file + SDataFReader* pReader; + SArray* aBlockIdx; // SArray + SMapData mDataBlk; + SBlockData bData; + int32_t iBlockIdx; + int32_t iDataBlk; + int32_t iRow; + } dIter; + + // TSDB_STT_FILE_DATA_ITER + struct { + SDataFReader* pReader; + int32_t iStt; + SArray* aSttBlk; + SBlockData bData; + int32_t iSttBlk; + int32_t iRow; + } sIter; + // TSDB_TOMB_FILE_DATA_ITER + struct { + SDelFReader* pReader; + SArray* aDelIdx; + SArray* aDelData; + int32_t iDelIdx; + int32_t iDelData; + } tIter; }; - SBlockData bData; - int32_t iRow; -} SFDataIter; +}; -struct STsdbSnapReader { - STsdb* pTsdb; +#define TSDB_FILTER_FLAG_BY_VERSION 0x1 +struct STsdbFilterInfo { + int32_t flag; int64_t sver; int64_t ever; - STsdbFS fs; - int8_t type; - // for data file - int8_t dataDone; - int32_t fid; - SDataFReader* pDataFReader; - SFDataIter* pIter; - SRBTree rbt; - SFDataIter aFDataIter[TSDB_MAX_STT_TRIGGER + 1]; - SBlockData bData; - SSkmInfo skmTable; - // for del file - int8_t delDone; - SDelFReader* pDelFReader; - SArray* aDelIdx; // SArray - int32_t iDelIdx; - SArray* aDelData; // SArray - uint8_t* aBuf[5]; }; -extern int32_t tRowInfoCmprFn(const void* p1, const void* p2); -extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData); -extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo); +#define TSDB_RBTN_TO_DATA_ITER(pNode) ((STsdbDataIter2*)(((char*)pNode) - offsetof(STsdbDataIter2, rbtn))) -static int32_t tFDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) { - SFDataIter* pIter1 = (SFDataIter*)(((uint8_t*)pNode1) - offsetof(SFDataIter, n)); - SFDataIter* pIter2 = (SFDataIter*)(((uint8_t*)pNode2) - offsetof(SFDataIter, n)); +/* open */ +static int32_t tsdbOpenDataFileDataIter(SDataFReader* pReader, STsdbDataIter2** ppIter) { + int32_t code = 0; + int32_t lino = 0; - return tRowInfoCmprFn(&pIter1->rInfo, &pIter2->rInfo); + // create handle + STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter)); + if (pIter == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + pIter->type = TSDB_DATA_FILE_DATA_ITER; + pIter->dIter.pReader = pReader; + if ((pIter->dIter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tBlockDataCreate(&pIter->dIter.bData); + TSDB_CHECK_CODE(code, lino, _exit); + + pIter->dIter.iBlockIdx = 0; + pIter->dIter.iDataBlk = 0; + pIter->dIter.iRow = 0; + + // read data + code = tsdbReadBlockIdx(pReader, pIter->dIter.aBlockIdx); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosArrayGetSize(pIter->dIter.aBlockIdx) == 0) goto _clear; + +_exit: + if (code) { + if (pIter) { + _clear: + tBlockDataDestroy(&pIter->dIter.bData, 1); + taosArrayDestroy(pIter->dIter.aBlockIdx); + taosMemoryFree(pIter); + pIter = NULL; + } + } + *ppIter = pIter; + return code; } -static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) { +static int32_t tsdbOpenSttFileDataIter(SDataFReader* pReader, int32_t iStt, STsdbDataIter2** ppIter) { int32_t code = 0; int32_t lino = 0; - SDFileSet dFileSet = {.fid = pReader->fid}; - SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT); - if (pSet == NULL) return code; + // create handle + STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter)); + if (pIter == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } - pReader->fid = pSet->fid; - code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet); - TSDB_CHECK_CODE(code, lino, _exit); + pIter->type = TSDB_STT_FILE_DATA_ITER; + pIter->sIter.pReader = pReader; + pIter->sIter.iStt = iStt; + pIter->sIter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); + if (pIter->sIter.aSttBlk == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } - pReader->pIter = NULL; - tRBTreeCreate(&pReader->rbt, tFDataIterCmprFn); + code = tBlockDataCreate(&pIter->sIter.bData); + TSDB_CHECK_CODE(code, lino, _exit); - // .data file - SFDataIter* pIter = &pReader->aFDataIter[0]; - pIter->type = SNAP_DATA_FILE_ITER; + pIter->sIter.iSttBlk = 0; + pIter->sIter.iRow = 0; - code = tsdbReadBlockIdx(pReader->pDataFReader, pIter->aBlockIdx); + // read data + code = tsdbReadSttBlk(pReader, iStt, pIter->sIter.aSttBlk); TSDB_CHECK_CODE(code, lino, _exit); - for (pIter->iBlockIdx = 0; pIter->iBlockIdx < taosArrayGetSize(pIter->aBlockIdx); pIter->iBlockIdx++) { - pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx); + if (taosArrayGetSize(pIter->sIter.aSttBlk) == 0) goto _clear; + +_exit: + if (code) { + if (pIter) { + _clear: + taosArrayDestroy(pIter->sIter.aSttBlk); + tBlockDataDestroy(&pIter->sIter.bData, 1); + taosMemoryFree(pIter); + pIter = NULL; + } + } + *ppIter = pIter; + return code; +} + +static int32_t tsdbOpenTombFileDataIter(SDelFReader* pReader, STsdbDataIter2** ppIter) { + int32_t code = 0; + int32_t lino = 0; - code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock); + STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter)); + if (pIter == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); + } + pIter->type = TSDB_TOMB_FILE_DATA_ITER; + + pIter->tIter.pReader = pReader; + if ((pIter->tIter.aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + if ((pIter->tIter.aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } - for (pIter->iBlock = 0; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) { - SDataBlk dataBlk; - tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk); + code = tsdbReadDelIdx(pReader, pIter->tIter.aDelIdx); + TSDB_CHECK_CODE(code, lino, _exit); - if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue; + if (taosArrayGetSize(pIter->tIter.aDelIdx) == 0) goto _clear; - code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData); - TSDB_CHECK_CODE(code, lino, _exit); + pIter->tIter.iDelIdx = 0; + pIter->tIter.iDelData = 0; + +_exit: + if (code) { + if (pIter) { + _clear: + taosArrayDestroy(pIter->tIter.aDelIdx); + taosArrayDestroy(pIter->tIter.aDelData); + taosMemoryFree(pIter); + pIter = NULL; + } + } + *ppIter = pIter; + return code; +} + +/* close */ +static void tsdbCloseDataFileDataIter(STsdbDataIter2* pIter) { + tBlockDataDestroy(&pIter->dIter.bData, 1); + tMapDataClear(&pIter->dIter.mDataBlk); + taosArrayDestroy(pIter->dIter.aBlockIdx); + taosMemoryFree(pIter); +} + +static void tsdbCloseSttFileDataIter(STsdbDataIter2* pIter) { + tBlockDataDestroy(&pIter->sIter.bData, 1); + taosArrayDestroy(pIter->sIter.aSttBlk); + taosMemoryFree(pIter); +} + +static void tsdbCloseTombFileDataIter(STsdbDataIter2* pIter) { + taosArrayDestroy(pIter->tIter.aDelData); + taosArrayDestroy(pIter->tIter.aDelIdx); + taosMemoryFree(pIter); +} + +static void tsdbCloseDataIter2(STsdbDataIter2* pIter) { + if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) { + ASSERT(0); + } else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) { + tsdbCloseDataFileDataIter(pIter); + } else if (pIter->type == TSDB_STT_FILE_DATA_ITER) { + tsdbCloseSttFileDataIter(pIter); + } else if (pIter->type == TSDB_TOMB_FILE_DATA_ITER) { + tsdbCloseTombFileDataIter(pIter); + } else { + ASSERT(0); + } +} - ASSERT(pIter->pBlockIdx->suid == pIter->bData.suid); - ASSERT(pIter->pBlockIdx->uid == pIter->bData.uid); +/* cmpr */ +static int32_t tsdbDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) { + STsdbDataIter2* pIter1 = TSDB_RBTN_TO_DATA_ITER(pNode1); + STsdbDataIter2* pIter2 = TSDB_RBTN_TO_DATA_ITER(pNode2); + return tRowInfoCmprFn(&pIter1->rowInfo, &pIter2->rowInfo); +} - for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) { - int64_t rowVer = pIter->bData.aVersion[pIter->iRow]; +/* seek */ - if (rowVer >= pReader->sver && rowVer <= pReader->ever) { - pIter->rInfo.suid = pIter->pBlockIdx->suid; - pIter->rInfo.uid = pIter->pBlockIdx->uid; - pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); - goto _add_iter_and_break; +/* iter next */ +static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { + int32_t code = 0; + int32_t lino = 0; + + for (;;) { + while (pIter->dIter.iRow < pIter->dIter.bData.nRow) { + if (pFilterInfo) { + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { + if (pIter->dIter.bData.aVersion[pIter->dIter.iRow] < pFilterInfo->sver || + pIter->dIter.bData.aVersion[pIter->dIter.iRow] > pFilterInfo->ever) { + pIter->dIter.iRow++; + continue; + } } } + + pIter->rowInfo.suid = pIter->dIter.bData.suid; + pIter->rowInfo.uid = pIter->dIter.bData.uid; + pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow); + pIter->dIter.iRow++; + goto _exit; } - continue; + for (;;) { + while (pIter->dIter.iDataBlk < pIter->dIter.mDataBlk.nItem) { + SDataBlk dataBlk; + tMapDataGetItemByIdx(&pIter->dIter.mDataBlk, pIter->dIter.iDataBlk, &dataBlk, tGetDataBlk); + + // filter + if (pFilterInfo) { + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { + if (pFilterInfo->sver > dataBlk.maxVer || pFilterInfo->ever < dataBlk.minVer) { + pIter->dIter.iDataBlk++; + continue; + } + } + } - _add_iter_and_break: - tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter); - break; - } + code = tsdbReadDataBlockEx(pIter->dIter.pReader, &dataBlk, &pIter->dIter.bData); + TSDB_CHECK_CODE(code, lino, _exit); - // .stt file - pIter = &pReader->aFDataIter[1]; - for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { - pIter->type = SNAP_STT_FILE_ITER; - pIter->iStt = iStt; + pIter->dIter.iDataBlk++; + pIter->dIter.iRow = 0; - code = tsdbReadSttBlk(pReader->pDataFReader, iStt, pIter->aSttBlk); - TSDB_CHECK_CODE(code, lino, _exit); + break; + } - for (pIter->iSttBlk = 0; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) { - SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk); + if (pIter->dIter.iRow < pIter->dIter.bData.nRow) break; - if (pSttBlk->minVer > pReader->ever) continue; - if (pSttBlk->maxVer < pReader->sver) continue; + for (;;) { + if (pIter->dIter.iBlockIdx < taosArrayGetSize(pIter->dIter.aBlockIdx)) { + SBlockIdx* pBlockIdx = taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx); - code = tsdbReadSttBlockEx(pReader->pDataFReader, iStt, pSttBlk, &pIter->bData); - TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk); + TSDB_CHECK_CODE(code, lino, _exit); - for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) { - int64_t rowVer = pIter->bData.aVersion[pIter->iRow]; + pIter->dIter.iBlockIdx++; + pIter->dIter.iDataBlk = 0; - if (rowVer >= pReader->sver && rowVer <= pReader->ever) { - pIter->rInfo.suid = pIter->bData.suid; - pIter->rInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; - pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); - goto _add_iter; + break; + } else { + pIter->rowInfo = (SRowInfo){0}; + goto _exit; } } } - - continue; - - _add_iter: - tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter); - pIter++; } _exit: if (code) { - tsdbError("vgId:%d, %s failed since %s", TD_VID(pReader->pTsdb->pVnode), __func__, tstrerror(code)); - } else { - tsdbInfo("vgId:%d, %s done, path:%s, fid:%d", TD_VID(pReader->pTsdb->pVnode), __func__, pReader->pTsdb->path, - pReader->fid); + tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } return code; } -static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) { +static int32_t tsdbSttFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { int32_t code = 0; + int32_t lino = 0; - if (pReader->pIter) { - SFDataIter* pIter = NULL; - while (true) { - _find_row: - pIter = pReader->pIter; - for (pIter->iRow++; pIter->iRow < pIter->bData.nRow; pIter->iRow++) { - int64_t rowVer = pIter->bData.aVersion[pIter->iRow]; - - if (rowVer >= pReader->sver && rowVer <= pReader->ever) { - pIter->rInfo.suid = pIter->bData.suid; - pIter->rInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; - pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); - goto _out; + for (;;) { + while (pIter->sIter.iRow < pIter->sIter.bData.nRow) { + if (pFilterInfo) { + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { + if (pFilterInfo->sver > pIter->sIter.bData.aVersion[pIter->sIter.iRow] || + pFilterInfo->ever < pIter->sIter.bData.aVersion[pIter->sIter.iRow]) { + pIter->sIter.iRow++; + continue; + } } } - if (pIter->type == SNAP_DATA_FILE_ITER) { - while (true) { - for (pIter->iBlock++; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) { - SDataBlk dataBlk; - tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk); - - if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue; + pIter->rowInfo.suid = pIter->sIter.bData.suid; + pIter->rowInfo.uid = pIter->sIter.bData.uid ? pIter->sIter.bData.uid : pIter->sIter.bData.aUid[pIter->sIter.iRow]; + pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->sIter.bData, pIter->sIter.iRow); + pIter->sIter.iRow++; + goto _exit; + } - code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData); - if (code) goto _err; + for (;;) { + if (pIter->sIter.iSttBlk < taosArrayGetSize(pIter->sIter.aSttBlk)) { + SSttBlk* pSttBlk = taosArrayGet(pIter->sIter.aSttBlk, pIter->sIter.iSttBlk); - pIter->iRow = -1; - goto _find_row; + if (pFilterInfo) { + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { + if (pFilterInfo->sver > pSttBlk->maxVer || pFilterInfo->ever < pSttBlk->minVer) { + pIter->sIter.iSttBlk++; + continue; + } } - - pIter->iBlockIdx++; - if (pIter->iBlockIdx >= taosArrayGetSize(pIter->aBlockIdx)) break; - - pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx); - code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock); - if (code) goto _err; - pIter->iBlock = -1; } - pReader->pIter = NULL; + code = tsdbReadSttBlockEx(pIter->sIter.pReader, pIter->sIter.iStt, pSttBlk, &pIter->sIter.bData); + TSDB_CHECK_CODE(code, lino, _exit); + + pIter->sIter.iRow = 0; + pIter->sIter.iSttBlk++; break; - } else if (pIter->type == SNAP_STT_FILE_ITER) { - for (pIter->iSttBlk++; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) { - SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk); + } else { + pIter->rowInfo = (SRowInfo){0}; + goto _exit; + } + } + } - if (pSttBlk->minVer > pReader->ever || pSttBlk->maxVer < pReader->sver) continue; +_exit: + if (code) { + tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} - code = tsdbReadSttBlockEx(pReader->pDataFReader, pIter->iStt, pSttBlk, &pIter->bData); - if (code) goto _err; +static int32_t tsdbTombFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { + int32_t code = 0; + int32_t lino = 0; - pIter->iRow = -1; - goto _find_row; + for (;;) { + while (pIter->tIter.iDelData < taosArrayGetSize(pIter->tIter.aDelData)) { + SDelData* pDelData = taosArrayGet(pIter->tIter.aDelData, pIter->tIter.iDelData); + + if (pFilterInfo) { + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { + if (pFilterInfo->sver > pDelData->version || pFilterInfo->ever < pDelData->version) { + pIter->tIter.iDelData++; + continue; + } } + } + + pIter->delInfo.delData = *pDelData; + pIter->tIter.iDelData++; + goto _exit; + } + + for (;;) { + if (pIter->tIter.iDelIdx < taosArrayGetSize(pIter->tIter.aDelIdx)) { + SDelIdx* pDelIdx = taosArrayGet(pIter->tIter.aDelIdx, pIter->tIter.iDelIdx); + + code = tsdbReadDelData(pIter->tIter.pReader, pDelIdx, pIter->tIter.aDelData); + TSDB_CHECK_CODE(code, lino, _exit); - pReader->pIter = NULL; + pIter->delInfo.suid = pDelIdx->suid; + pIter->delInfo.uid = pDelIdx->uid; + pIter->tIter.iDelData = 0; + pIter->tIter.iDelIdx++; break; } else { - ASSERT(0); + pIter->delInfo = (SDelInfo){0}; + goto _exit; } } + } - _out: - pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt); - if (pReader->pIter && pIter) { - int32_t c = tRowInfoCmprFn(&pReader->pIter->rInfo, &pIter->rInfo); - if (c > 0) { - tRBTreePut(&pReader->rbt, (SRBTreeNode*)pReader->pIter); - pReader->pIter = NULL; - } else { - ASSERT(c); - } +_exit: + if (code) { + tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbDataIterNext2(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) { + int32_t code = 0; + + if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) { + ASSERT(0); + return code; + } else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) { + return tsdbDataFileDataIterNext(pIter, pFilterInfo); + } else if (pIter->type == TSDB_STT_FILE_DATA_ITER) { + return tsdbSttFileDataIterNext(pIter, pFilterInfo); + } else if (pIter->type == TSDB_TOMB_FILE_DATA_ITER) { + return tsdbTombFileDataIterNext(pIter, pFilterInfo); + } else { + ASSERT(0); + return code; + } +} + +/* get */ + +// STsdbSnapReader ======================================== +struct STsdbSnapReader { + STsdb* pTsdb; + int64_t sver; + int64_t ever; + int8_t type; + uint8_t* aBuf[5]; + + STsdbFS fs; + TABLEID tbid; + SSkmInfo skmTable; + + // timeseries data + int8_t dataDone; + int32_t fid; + + SDataFReader* pDataFReader; + STsdbDataIter2* iterList; + STsdbDataIter2* pIter; + SRBTree rbt; + SBlockData bData; + + // tombstone data + int8_t delDone; + SDelFReader* pDelFReader; + STsdbDataIter2* pTIter; + SArray* aDelData; +}; + +static int32_t tsdbSnapReadFileDataStart(STsdbSnapReader* pReader) { + int32_t code = 0; + int32_t lino = 0; + + SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &(SDFileSet){.fid = pReader->fid}, tDFileSetCmprFn, TD_GT); + if (pSet == NULL) { + pReader->fid = INT32_MAX; + goto _exit; + } + + pReader->fid = pSet->fid; + + tRBTreeCreate(&pReader->rbt, tsdbDataIterCmprFn); + + code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbOpenDataFileDataIter(pReader->pDataFReader, &pReader->pIter); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pReader->pIter) { + // iter to next with filter info (sver, ever) + code = tsdbDataIterNext2(pReader->pIter, + &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION, // flag + .sver = pReader->sver, + .ever = pReader->ever}); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pReader->pIter->rowInfo.suid || pReader->pIter->rowInfo.uid) { + // add to rbtree + tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn); + + // add to iterList + pReader->pIter->next = pReader->iterList; + pReader->iterList = pReader->pIter; + } else { + tsdbCloseDataIter2(pReader->pIter); } } - if (pReader->pIter == NULL) { - pReader->pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt); + for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) { + code = tsdbOpenSttFileDataIter(pReader->pDataFReader, iStt, &pReader->pIter); + TSDB_CHECK_CODE(code, lino, _exit); + if (pReader->pIter) { - tRBTreeDrop(&pReader->rbt, (SRBTreeNode*)pReader->pIter); + // iter to valid row + code = tsdbDataIterNext2(pReader->pIter, + &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION, // flag + .sver = pReader->sver, + .ever = pReader->ever}); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pReader->pIter->rowInfo.suid || pReader->pIter->rowInfo.uid) { + // add to rbtree + tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn); + + // add to iterList + pReader->pIter->next = pReader->iterList; + pReader->iterList = pReader->pIter; + } else { + tsdbCloseDataIter2(pReader->pIter); + } } } - return code; + pReader->pIter = NULL; -_err: +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbInfo("vgId:%d %s done, fid:%d", TD_VID(pReader->pTsdb->pVnode), __func__, pReader->fid); + } return code; } -static SRowInfo* tsdbSnapGetRow(STsdbSnapReader* pReader) { +static void tsdbSnapReadFileDataEnd(STsdbSnapReader* pReader) { + while (pReader->iterList) { + STsdbDataIter2* pIter = pReader->iterList; + pReader->iterList = pIter->next; + tsdbCloseDataIter2(pIter); + } + + tsdbDataFReaderClose(&pReader->pDataFReader); +} + +static int32_t tsdbSnapReadNextRow(STsdbSnapReader* pReader, SRowInfo** ppRowInfo) { + int32_t code = 0; + int32_t lino = 0; + if (pReader->pIter) { - return &pReader->pIter->rInfo; - } else { - tsdbSnapNextRow(pReader); + code = tsdbDataIterNext2(pReader->pIter, &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION, // flag + .sver = pReader->sver, + .ever = pReader->ever}); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pReader->pIter->rowInfo.suid == 0 && pReader->pIter->rowInfo.uid == 0) { + pReader->pIter = NULL; + } else { + SRBTreeNode* pNode = tRBTreeMin(&pReader->rbt); + if (pNode) { + int32_t c = tsdbDataIterCmprFn(&pReader->pIter->rbtn, pNode); + if (c > 0) { + tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn); + pReader->pIter = NULL; + } else if (c == 0) { + ASSERT(0); + } + } + } + } + + if (pReader->pIter == NULL) { + SRBTreeNode* pNode = tRBTreeMin(&pReader->rbt); + if (pNode) { + tRBTreeDrop(&pReader->rbt, pNode); + pReader->pIter = TSDB_RBTN_TO_DATA_ITER(pNode); + } + } + if (ppRowInfo) { if (pReader->pIter) { - return &pReader->pIter->rInfo; + *ppRowInfo = &pReader->pIter->rowInfo; } else { - return NULL; + *ppRowInfo = NULL; } } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbSnapReadGetRow(STsdbSnapReader* pReader, SRowInfo** ppRowInfo) { + if (pReader->pIter) { + *ppRowInfo = &pReader->pIter->rowInfo; + return 0; + } + + return tsdbSnapReadNextRow(pReader, ppRowInfo); } static int32_t tsdbSnapCmprData(STsdbSnapReader* pReader, uint8_t** ppData) { @@ -318,155 +672,213 @@ _exit: return code; } -static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { +static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* pReader, uint8_t** ppData) { int32_t code = 0; int32_t lino = 0; STsdb* pTsdb = pReader->pTsdb; - while (true) { + tBlockDataClear(&pReader->bData); + + for (;;) { + // start a new file read if need if (pReader->pDataFReader == NULL) { - code = tsdbSnapReadOpenFile(pReader); + code = tsdbSnapReadFileDataStart(pReader); TSDB_CHECK_CODE(code, lino, _exit); } if (pReader->pDataFReader == NULL) break; - SRowInfo* pRowInfo = tsdbSnapGetRow(pReader); + SRowInfo* pRowInfo; + code = tsdbSnapReadGetRow(pReader, &pRowInfo); + TSDB_CHECK_CODE(code, lino, _exit); + if (pRowInfo == NULL) { - tsdbDataFReaderClose(&pReader->pDataFReader); + tsdbSnapReadFileDataEnd(pReader); continue; } - TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid}; - SBlockData* pBlockData = &pReader->bData; - - code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, id.suid, id.uid, &pReader->skmTable); + code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, pRowInfo->suid, pRowInfo->uid, &pReader->skmTable); TSDB_CHECK_CODE(code, lino, _exit); - code = tBlockDataInit(pBlockData, &id, pReader->skmTable.pTSchema, NULL, 0); + code = tBlockDataInit(&pReader->bData, (TABLEID*)pRowInfo, pReader->skmTable.pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); - while (pRowInfo->suid == id.suid && pRowInfo->uid == id.uid) { - code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL, pRowInfo->uid); + do { + if (!TABLE_SAME_SCHEMA(pReader->bData.suid, pReader->bData.uid, pRowInfo->suid, pRowInfo->uid)) break; + + if (pReader->bData.uid && pReader->bData.uid != pRowInfo->uid) { + code = tRealloc((uint8_t**)&pReader->bData.aUid, sizeof(int64_t) * (pReader->bData.nRow + 1)); + TSDB_CHECK_CODE(code, lino, _exit); + + for (int32_t iRow = 0; iRow < pReader->bData.nRow; ++iRow) { + pReader->bData.aUid[iRow] = pReader->bData.uid; + } + pReader->bData.uid = 0; + } + + code = tBlockDataAppendRow(&pReader->bData, &pRowInfo->row, NULL, pRowInfo->uid); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbSnapNextRow(pReader); + code = tsdbSnapReadNextRow(pReader, &pRowInfo); TSDB_CHECK_CODE(code, lino, _exit); - pRowInfo = tsdbSnapGetRow(pReader); - if (pRowInfo == NULL) { - tsdbDataFReaderClose(&pReader->pDataFReader); - break; - } + if (pReader->bData.nRow >= 4096) break; + } while (pRowInfo); + + ASSERT(pReader->bData.nRow > 0); + + break; + } + + if (pReader->bData.nRow > 0) { + code = tsdbSnapCmprData(pReader, ppData); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbSnapCmprTombData(STsdbSnapReader* pReader, uint8_t** ppData) { + int32_t code = 0; + int32_t lino = 0; + + int64_t size = sizeof(TABLEID); + for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); ++iDelData) { + size += tPutDelData(NULL, taosArrayGet(pReader->aDelData, iDelData)); + } + + uint8_t* pData = (uint8_t*)taosMemoryMalloc(sizeof(SSnapDataHdr) + size); + if (pData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; + pHdr->type = SNAP_DATA_DEL; + pHdr->size = size; + + TABLEID* pId = (TABLEID*)(pData + sizeof(SSnapDataHdr)); + *pId = pReader->tbid; + + size = sizeof(SSnapDataHdr) + sizeof(TABLEID); + for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); ++iDelData) { + size += tPutDelData(pData + size, taosArrayGet(pReader->aDelData, iDelData)); + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); + if (pData) { + taosMemoryFree(pData); + pData = NULL; + } + } + *ppData = pData; + return code; +} - if (pBlockData->nRow >= 4096) break; - } +static void tsdbSnapReadGetTombData(STsdbSnapReader* pReader, SDelInfo** ppDelInfo) { + if (pReader->pTIter == NULL || (pReader->pTIter->delInfo.suid == 0 && pReader->pTIter->delInfo.uid == 0)) { + *ppDelInfo = NULL; + } else { + *ppDelInfo = &pReader->pTIter->delInfo; + } +} - code = tsdbSnapCmprData(pReader, ppData); - TSDB_CHECK_CODE(code, lino, _exit); +static int32_t tsdbSnapReadNextTombData(STsdbSnapReader* pReader, SDelInfo** ppDelInfo) { + int32_t code = 0; + int32_t lino = 0; - break; + code = tsdbDataIterNext2( + pReader->pTIter, + &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION, .sver = pReader->sver, .ever = pReader->ever}); + TSDB_CHECK_CODE(code, lino, _exit); + + if (ppDelInfo) { + tsdbSnapReadGetTombData(pReader, ppDelInfo); } _exit: if (code) { - tsdbError("vgId:%d, %s failed since %s, path:%s", TD_VID(pTsdb->pVnode), __func__, tstrerror(code), pTsdb->path); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } -static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { +static int32_t tsdbSnapReadTombData(STsdbSnapReader* pReader, uint8_t** ppData) { int32_t code = 0; int32_t lino = 0; - STsdb* pTsdb = pReader->pTsdb; - SDelFile* pDelFile = pReader->fs.pDelFile; + STsdb* pTsdb = pReader->pTsdb; + // open tombstone data iter if need if (pReader->pDelFReader == NULL) { - if (pDelFile == NULL) { - goto _exit; - } + if (pReader->fs.pDelFile == NULL) goto _exit; // open - code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pTsdb); + code = tsdbDelFReaderOpen(&pReader->pDelFReader, pReader->fs.pDelFile, pTsdb); TSDB_CHECK_CODE(code, lino, _exit); - // read index - code = tsdbReadDelIdx(pReader->pDelFReader, pReader->aDelIdx); + code = tsdbOpenTombFileDataIter(pReader->pDelFReader, &pReader->pTIter); TSDB_CHECK_CODE(code, lino, _exit); - pReader->iDelIdx = 0; + if (pReader->pTIter) { + code = tsdbSnapReadNextTombData(pReader, NULL); + TSDB_CHECK_CODE(code, lino, _exit); + } } - while (true) { - if (pReader->iDelIdx >= taosArrayGetSize(pReader->aDelIdx)) { - tsdbDelFReaderClose(&pReader->pDelFReader); - break; - } + // loop to get tombstone data + SDelInfo* pDelInfo; + tsdbSnapReadGetTombData(pReader, &pDelInfo); - SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pReader->aDelIdx, pReader->iDelIdx); + if (pDelInfo == NULL) goto _exit; - pReader->iDelIdx++; + pReader->tbid = *(TABLEID*)pDelInfo; - code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData); + if (pReader->aDelData) { + taosArrayClear(pReader->aDelData); + } else if ((pReader->aDelData = taosArrayInit(16, sizeof(SDelData))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); + } - int32_t size = 0; - for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) { - SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData); - - if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) { - size += tPutDelData(NULL, pDelData); - } - } - if (size == 0) continue; - - // org data - size = sizeof(TABLEID) + size; - *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size); - if (*ppData == NULL) { + while (pDelInfo && pDelInfo->suid == pReader->tbid.suid && pDelInfo->uid == pReader->tbid.uid) { + if (taosArrayPush(pReader->aDelData, &pDelInfo->delData) < 0) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } - SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); - pHdr->type = SNAP_DATA_DEL; - pHdr->size = size; - - TABLEID* pId = (TABLEID*)(&pHdr[1]); - pId->suid = pDelIdx->suid; - pId->uid = pDelIdx->uid; - int32_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID); - for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) { - SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData); - - if (pDelData->version < pReader->sver) continue; - if (pDelData->version > pReader->ever) continue; - - n += tPutDelData((*ppData) + n, pDelData); - } - - tsdbInfo("vgId:%d, vnode snapshot tsdb read del data for %s, suid:%" PRId64 " uid:%" PRId64 " size:%d", - TD_VID(pTsdb->pVnode), pTsdb->path, pDelIdx->suid, pDelIdx->uid, size); + code = tsdbSnapReadNextTombData(pReader, &pDelInfo); + TSDB_CHECK_CODE(code, lino, _exit); + } - break; + // encode tombstone data + if (taosArrayGetSize(pReader->aDelData) > 0) { + code = tsdbSnapCmprTombData(pReader, ppData); + TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { - tsdbError("vgId:%d, %s failed since %s, path:%s", TD_VID(pTsdb->pVnode), __func__, tstrerror(code), pTsdb->path); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); } return code; } int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader) { - int32_t code = 0; - int32_t lino = 0; - STsdbSnapReader* pReader = NULL; + int32_t code = 0; + int32_t lino = 0; // alloc - pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader)); + STsdbSnapReader* pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader)); if (pReader == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); @@ -476,118 +888,79 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type pReader->ever = ever; pReader->type = type; - code = taosThreadRwlockRdlock(&pTsdb->rwLock); - if (code) { - code = TAOS_SYSTEM_ERROR(code); - TSDB_CHECK_CODE(code, lino, _exit); - } - + taosThreadRwlockRdlock(&pTsdb->rwLock); code = tsdbFSRef(pTsdb, &pReader->fs); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); TSDB_CHECK_CODE(code, lino, _exit); } + taosThreadRwlockUnlock(&pTsdb->rwLock); - code = taosThreadRwlockUnlock(&pTsdb->rwLock); - if (code) { - code = TAOS_SYSTEM_ERROR(code); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // data + // init pReader->fid = INT32_MIN; - for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) { - SFDataIter* pIter = &pReader->aFDataIter[iIter]; - - if (iIter == 0) { - pIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pIter->aBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - } else { - pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); - if (pIter->aSttBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - code = tBlockDataCreate(&pIter->bData); - TSDB_CHECK_CODE(code, lino, _exit); - } code = tBlockDataCreate(&pReader->bData); TSDB_CHECK_CODE(code, lino, _exit); - // del - pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx)); - if (pReader->aDelIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - pReader->aDelData = taosArrayInit(0, sizeof(SDelData)); - if (pReader->aDelData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - _exit: if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s, TSDB path: %s", TD_VID(pTsdb->pVnode), __func__, lino, - tstrerror(code), pTsdb->path); - *ppReader = NULL; - + tsdbError("vgId:%d %s failed at line %d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(pTsdb->pVnode), + __func__, lino, tstrerror(code), sver, ever, type); if (pReader) { - taosArrayDestroy(pReader->aDelData); - taosArrayDestroy(pReader->aDelIdx); tBlockDataDestroy(&pReader->bData, 1); - tsdbFSDestroy(&pReader->fs); + tsdbFSUnref(pTsdb, &pReader->fs); taosMemoryFree(pReader); + pReader = NULL; } } else { - *ppReader = pReader; - tsdbInfo("vgId:%d, vnode snapshot tsdb reader opened for %s", TD_VID(pTsdb->pVnode), pTsdb->path); + tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(pTsdb->pVnode), __func__, sver, ever, + type); } + *ppReader = pReader; return code; } int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { - int32_t code = 0; - STsdbSnapReader* pReader = *ppReader; - - // data - if (pReader->pDataFReader) tsdbDataFReaderClose(&pReader->pDataFReader); - for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) { - SFDataIter* pIter = &pReader->aFDataIter[iIter]; + int32_t code = 0; + int32_t lino = 0; - if (iIter == 0) { - taosArrayDestroy(pIter->aBlockIdx); - tMapDataClear(&pIter->mBlock); - } else { - taosArrayDestroy(pIter->aSttBlk); - } + STsdbSnapReader* pReader = *ppReader; - tBlockDataDestroy(&pIter->bData, 1); + // tombstone + if (pReader->pTIter) { + tsdbCloseDataIter2(pReader->pTIter); + pReader->pTIter = NULL; } + if (pReader->pDelFReader) { + tsdbDelFReaderClose(&pReader->pDelFReader); + } + taosArrayDestroy(pReader->aDelData); + // timeseries + while (pReader->iterList) { + STsdbDataIter2* pIter = pReader->iterList; + pReader->iterList = pIter->next; + tsdbCloseDataIter2(pIter); + } + if (pReader->pDataFReader) { + tsdbDataFReaderClose(&pReader->pDataFReader); + } tBlockDataDestroy(&pReader->bData, 1); - tDestroyTSchema(pReader->skmTable.pTSchema); - - // del - if (pReader->pDelFReader) tsdbDelFReaderClose(&pReader->pDelFReader); - taosArrayDestroy(pReader->aDelIdx); - taosArrayDestroy(pReader->aDelData); + // other + tDestroyTSchema(pReader->skmTable.pTSchema); tsdbFSUnref(pReader->pTsdb, &pReader->fs); - - tsdbInfo("vgId:%d, vnode snapshot tsdb reader closed for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path); - for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(pReader->aBuf[0]); iBuf++) { tFree(pReader->aBuf[iBuf]); } - taosMemoryFree(pReader); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbDebug("vgId:%d %s done", TD_VID(pReader->pTsdb->pVnode), __func__); + } *ppReader = NULL; return code; } @@ -600,7 +973,7 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) { // read data file if (!pReader->dataDone) { - code = tsdbSnapReadData(pReader, ppData); + code = tsdbSnapReadTimeSeriesData(pReader, ppData); TSDB_CHECK_CODE(code, lino, _exit); if (*ppData) { goto _exit; @@ -611,7 +984,7 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) { // read del file if (!pReader->delDone) { - code = tsdbSnapReadDel(pReader, ppData); + code = tsdbSnapReadTombData(pReader, ppData); TSDB_CHECK_CODE(code, lino, _exit); if (*ppData) { goto _exit; @@ -622,22 +995,18 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) { _exit: if (code) { - tsdbError("vgId:%d, %s failed since %s, path:%s", TD_VID(pReader->pTsdb->pVnode), __func__, tstrerror(code), - pReader->pTsdb->path); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { - tsdbDebug("vgId:%d, %s done, path:%s", TD_VID(pReader->pTsdb->pVnode), __func__, pReader->pTsdb->path); + tsdbDebug("vgId:%d %s done", TD_VID(pReader->pTsdb->pVnode), __func__); } return code; } // STsdbSnapWriter ======================================== struct STsdbSnapWriter { - STsdb* pTsdb; - int64_t sver; - int64_t ever; - STsdbFS fs; - - // config + STsdb* pTsdb; + int64_t sver; + int64_t ever; int32_t minutes; int8_t precision; int32_t minRow; @@ -646,641 +1015,816 @@ struct STsdbSnapWriter { int64_t commitID; uint8_t* aBuf[5]; - // for data file - SBlockData bData; - int32_t fid; - TABLEID id; - SSkmInfo skmTable; - struct { - SDataFReader* pReader; - SArray* aBlockIdx; - int32_t iBlockIdx; - SBlockIdx* pBlockIdx; - SMapData mDataBlk; - int32_t iDataBlk; - SBlockData bData; - int32_t iRow; - } dReader; - struct { - SDataFWriter* pWriter; - SArray* aBlockIdx; - SMapData mDataBlk; - SArray* aSttBlk; - SBlockData bData; - SBlockData sData; - } dWriter; - - // for del file - SDelFReader* pDelFReader; + STsdbFS fs; + TABLEID tbid; + + // time-series data + SBlockData inData; + + int32_t fid; + SSkmInfo skmTable; + + /* reader */ + SDataFReader* pDataFReader; + STsdbDataIter2* iterList; + STsdbDataIter2* pDIter; + STsdbDataIter2* pSIter; + SRBTree rbt; // SRBTree + + /* writer */ + SDataFWriter* pDataFWriter; + SArray* aBlockIdx; + SMapData mDataBlk; // SMapData + SArray* aSttBlk; // SArray + SBlockData bData; + SBlockData sData; + + // tombstone data + /* reader */ + SDelFReader* pDelFReader; + STsdbDataIter2* pTIter; + + /* writer */ SDelFWriter* pDelFWriter; - int32_t iDelIdx; - SArray* aDelIdxR; + SArray* aDelIdx; SArray* aDelData; - SArray* aDelIdxW; }; // SNAP_DATA_TSDB -extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg); -extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg); - -static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) { +static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) { int32_t code = 0; + int32_t lino = 0; + + if (pId) { + pWriter->tbid = *pId; + } else { + pWriter->tbid = (TABLEID){INT64_MAX, INT64_MAX}; + } + + if (pWriter->pDIter) { + STsdbDataIter2* pIter = pWriter->pDIter; + + // assert last table data end + ASSERT(pIter->dIter.iRow >= pIter->dIter.bData.nRow); + ASSERT(pIter->dIter.iDataBlk >= pIter->dIter.mDataBlk.nItem); + + for (;;) { + if (pIter->dIter.iBlockIdx >= taosArrayGetSize(pIter->dIter.aBlockIdx)) { + pWriter->pDIter = NULL; + break; + } + + SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx); + + int32_t c = tTABLEIDCmprFn(pBlockIdx, &pWriter->tbid); + if (c < 0) { + code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk); + TSDB_CHECK_CODE(code, lino, _exit); + + SBlockIdx* pNewBlockIdx = taosArrayReserve(pWriter->aBlockIdx, 1); + if (pNewBlockIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + pNewBlockIdx->suid = pBlockIdx->suid; + pNewBlockIdx->uid = pBlockIdx->uid; + + code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pIter->dIter.mDataBlk, pNewBlockIdx); + TSDB_CHECK_CODE(code, lino, _exit); + + pIter->dIter.iBlockIdx++; + } else if (c == 0) { + code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk); + TSDB_CHECK_CODE(code, lino, _exit); + + pIter->dIter.iDataBlk = 0; + pIter->dIter.iBlockIdx++; + + break; + } else { + pIter->dIter.iDataBlk = pIter->dIter.mDataBlk.nItem; + break; + } + } + } + + if (pId) { + code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable); + TSDB_CHECK_CODE(code, lino, _exit); - ASSERT(pWriter->dReader.iRow >= pWriter->dReader.bData.nRow); + tMapDataReset(&pWriter->mDataBlk); - if (pWriter->dReader.iBlockIdx < taosArrayGetSize(pWriter->dReader.aBlockIdx)) { - pWriter->dReader.pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->dReader.aBlockIdx, pWriter->dReader.iBlockIdx); + code = tBlockDataInit(&pWriter->bData, pId, pWriter->skmTable.pTSchema, NULL, 0); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (!TABLE_SAME_SCHEMA(pWriter->tbid.suid, pWriter->tbid.uid, pWriter->sData.suid, pWriter->sData.uid)) { + if ((pWriter->sData.nRow > 0)) { + code = tsdbWriteSttBlock(pWriter->pDataFWriter, &pWriter->sData, pWriter->aSttBlk, pWriter->cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); + } - code = tsdbReadDataBlk(pWriter->dReader.pReader, pWriter->dReader.pBlockIdx, &pWriter->dReader.mDataBlk); - if (code) goto _exit; + if (pId) { + TABLEID id = {.suid = pWriter->tbid.suid, .uid = pWriter->tbid.suid ? 0 : pWriter->tbid.uid}; + code = tBlockDataInit(&pWriter->sData, &id, pWriter->skmTable.pTSchema, NULL, 0); + TSDB_CHECK_CODE(code, lino, _exit); + } + } - pWriter->dReader.iBlockIdx++; +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { - pWriter->dReader.pBlockIdx = NULL; - tMapDataReset(&pWriter->dReader.mDataBlk); + tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__, + pWriter->tbid.suid, pWriter->tbid.uid); + } + return code; +} + +static int32_t tsdbSnapWriteTableRowImpl(STsdbSnapWriter* pWriter, TSDBROW* pRow) { + int32_t code = 0; + int32_t lino = 0; + + code = tBlockDataAppendRow(&pWriter->bData, pRow, pWriter->skmTable.pTSchema, pWriter->tbid.uid); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pWriter->bData.nRow >= pWriter->maxRow) { + code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); } - pWriter->dReader.iDataBlk = 0; // point to the next one - tBlockDataReset(&pWriter->dReader.bData); - pWriter->dReader.iRow = 0; _exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } -static int32_t tsdbSnapWriteCopyData(STsdbSnapWriter* pWriter, TABLEID* pId) { +static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) { int32_t code = 0; + int32_t lino = 0; + + TSDBKEY inKey = pRow ? TSDBROW_KEY(pRow) : TSDBKEY_MAX; - while (true) { - if (pWriter->dReader.pBlockIdx == NULL) break; - if (tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, pId) >= 0) break; + if (pWriter->pDIter == NULL || (pWriter->pDIter->dIter.iRow >= pWriter->pDIter->dIter.bData.nRow && + pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem)) { + goto _write_row; + } else { + for (;;) { + while (pWriter->pDIter->dIter.iRow < pWriter->pDIter->dIter.bData.nRow) { + TSDBROW row = tsdbRowFromBlockData(&pWriter->pDIter->dIter.bData, pWriter->pDIter->dIter.iRow); + + int32_t c = tsdbKeyCmprFn(&inKey, &TSDBROW_KEY(&row)); + if (c < 0) { + goto _write_row; + } else if (c > 0) { + code = tsdbSnapWriteTableRowImpl(pWriter, &row); + TSDB_CHECK_CODE(code, lino, _exit); + + pWriter->pDIter->dIter.iRow++; + } else { + ASSERT(0); + } + } - SBlockIdx blkIdx = *pWriter->dReader.pBlockIdx; - code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dReader.mDataBlk, &blkIdx); - if (code) goto _exit; + for (;;) { + if (pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem) goto _write_row; - if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blkIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + // FIXME: Here can be slow, use array instead + SDataBlk dataBlk; + tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk, &dataBlk, tGetDataBlk); + + int32_t c = tDataBlkCmprFn(&dataBlk, &(SDataBlk){.minKey = inKey, .maxKey = inKey}); + if (c > 0) { + goto _write_row; + } else if (c < 0) { + if (pWriter->bData.nRow > 0) { + code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); + } + + tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk); + pWriter->pDIter->dIter.iDataBlk++; + } else { + code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData); + TSDB_CHECK_CODE(code, lino, _exit); + + pWriter->pDIter->dIter.iRow = 0; + pWriter->pDIter->dIter.iDataBlk++; + break; + } + } } + } - code = tsdbSnapNextTableData(pWriter); - if (code) goto _exit; +_write_row: + if (pRow) { + code = tsdbSnapWriteTableRowImpl(pWriter, pRow); + TSDB_CHECK_CODE(code, lino, _exit); } _exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } -static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) { +static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { int32_t code = 0; + int32_t lino = 0; + + // write a NULL row to end current table data write + code = tsdbSnapWriteTableRow(pWriter, NULL); + TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbSnapWriteCopyData(pWriter, pId); - if (code) goto _err; + if (pWriter->bData.nRow > 0) { + if (pWriter->bData.nRow < pWriter->minRow) { + ASSERT(TABLE_SAME_SCHEMA(pWriter->sData.suid, pWriter->sData.uid, pWriter->tbid.suid, pWriter->tbid.uid)); + for (int32_t iRow = 0; iRow < pWriter->bData.nRow; iRow++) { + code = + tBlockDataAppendRow(&pWriter->sData, &tsdbRowFromBlockData(&pWriter->bData, iRow), NULL, pWriter->tbid.uid); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pWriter->sData.nRow >= pWriter->maxRow) { + code = tsdbWriteSttBlock(pWriter->pDataFWriter, &pWriter->sData, pWriter->aSttBlk, pWriter->cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); + } + } - pWriter->id.suid = pId->suid; - pWriter->id.uid = pId->uid; + tBlockDataClear(&pWriter->bData); + } else { + code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); + } + } - code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable); - if (code) goto _err; + if (pWriter->mDataBlk.nItem) { + SBlockIdx* pBlockIdx = taosArrayReserve(pWriter->aBlockIdx, 1); + if (pBlockIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } - tMapDataReset(&pWriter->dWriter.mDataBlk); - code = tBlockDataInit(&pWriter->dWriter.bData, pId, pWriter->skmTable.pTSchema, NULL, 0); - if (code) goto _err; + pBlockIdx->suid = pWriter->tbid.suid; + pBlockIdx->uid = pWriter->tbid.uid; - return code; + code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pWriter->mDataBlk, pBlockIdx); + TSDB_CHECK_CODE(code, lino, _exit); + } -_err: - tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } -static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { +static int32_t tsdbSnapWriteFileDataStart(STsdbSnapWriter* pWriter, int32_t fid) { int32_t code = 0; + int32_t lino = 0; - if (pWriter->id.suid == 0 && pWriter->id.uid == 0) return code; + ASSERT(pWriter->pDataFWriter == NULL && pWriter->fid < fid); - int32_t c = 1; - if (pWriter->dReader.pBlockIdx) { - c = tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &pWriter->id); - ASSERT(c >= 0); - } + STsdb* pTsdb = pWriter->pTsdb; + + pWriter->fid = fid; + pWriter->tbid = (TABLEID){0}; + SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); + + // open reader + pWriter->pDataFReader = NULL; + pWriter->iterList = NULL; + pWriter->pDIter = NULL; + pWriter->pSIter = NULL; + tRBTreeCreate(&pWriter->rbt, tsdbDataIterCmprFn); + if (pSet) { + code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbOpenDataFileDataIter(pWriter->pDataFReader, &pWriter->pDIter); + TSDB_CHECK_CODE(code, lino, _exit); + if (pWriter->pDIter) { + pWriter->pDIter->next = pWriter->iterList; + pWriter->iterList = pWriter->pDIter; + } - if (c == 0) { - SBlockData* pBData = &pWriter->dWriter.bData; + for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { + code = tsdbOpenSttFileDataIter(pWriter->pDataFReader, iStt, &pWriter->pSIter); + TSDB_CHECK_CODE(code, lino, _exit); - for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) { - TSDBROW row = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow); + if (pWriter->pSIter) { + code = tsdbSttFileDataIterNext(pWriter->pSIter, NULL); + TSDB_CHECK_CODE(code, lino, _exit); - code = tBlockDataAppendRow(pBData, &row, NULL, pWriter->id.uid); - if (code) goto _err; + // add to tree + tRBTreePut(&pWriter->rbt, &pWriter->pSIter->rbtn); - if (pBData->nRow >= pWriter->maxRow) { - code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg); - if (code) goto _err; + // add to list + pWriter->pSIter->next = pWriter->iterList; + pWriter->iterList = pWriter->pSIter; } } - code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg); - if (code) goto _err; + pWriter->pSIter = NULL; + } + + // open writer + SDiskID diskId; + if (pSet) { + diskId = pSet->diskId; + } else { + tfsAllocDisk(pTsdb->pVnode->pTfs, 0 /*TODO*/, &diskId); + tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, diskId); + } + SDFileSet wSet = {.diskId = diskId, + .fid = fid, + .pHeadF = &(SHeadFile){.commitID = pWriter->commitID}, + .pDataF = (pSet) ? pSet->pDataF : &(SDataFile){.commitID = pWriter->commitID}, + .pSmaF = (pSet) ? pSet->pSmaF : &(SSmaFile){.commitID = pWriter->commitID}, + .nSttF = 1, + .aSttF = {&(SSttFile){.commitID = pWriter->commitID}}}; + code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet); + TSDB_CHECK_CODE(code, lino, _exit); - for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) { - SDataBlk dataBlk; - tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk); + if (pWriter->aBlockIdx) { + taosArrayClear(pWriter->aBlockIdx); + } else if ((pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } - code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk); - if (code) goto _err; - } + tMapDataReset(&pWriter->mDataBlk); - code = tsdbSnapNextTableData(pWriter); - if (code) goto _err; + if (pWriter->aSttBlk) { + taosArrayClear(pWriter->aSttBlk); + } else if ((pWriter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); } - if (pWriter->dWriter.mDataBlk.nItem) { - SBlockIdx blockIdx = {.suid = pWriter->id.suid, .uid = pWriter->id.uid}; - code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dWriter.mDataBlk, &blockIdx); + tBlockDataReset(&pWriter->bData); + tBlockDataReset(&pWriter->sData); - if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blockIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code), + fid); + } else { + tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(pTsdb->pVnode), __func__, fid); + } + return code; +} + +static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowInfo) { + int32_t code = 0; + int32_t lino = 0; + + // switch to new table if need + if (pRowInfo == NULL || pRowInfo->uid != pWriter->tbid.uid) { + if (pWriter->tbid.uid) { + code = tsdbSnapWriteTableDataEnd(pWriter); + TSDB_CHECK_CODE(code, lino, _exit); } + + code = tsdbSnapWriteTableDataStart(pWriter, (TABLEID*)pRowInfo); + TSDB_CHECK_CODE(code, lino, _exit); } - pWriter->id.suid = 0; - pWriter->id.uid = 0; + if (pRowInfo == NULL) goto _exit; - return code; + code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row); + TSDB_CHECK_CODE(code, lino, _exit); -_err: +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } -static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) { +static int32_t tsdbSnapWriteNextRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInfo) { int32_t code = 0; - STsdb* pTsdb = pWriter->pTsdb; - - ASSERT(pWriter->dWriter.pWriter == NULL); + int32_t lino = 0; - pWriter->fid = fid; - pWriter->id = (TABLEID){0}; - SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); + if (pWriter->pSIter) { + code = tsdbDataIterNext2(pWriter->pSIter, NULL); + TSDB_CHECK_CODE(code, lino, _exit); - // Reader - if (pSet) { - code = tsdbDataFReaderOpen(&pWriter->dReader.pReader, pWriter->pTsdb, pSet); - if (code) goto _err; + if (pWriter->pSIter->rowInfo.suid == 0 && pWriter->pSIter->rowInfo.uid == 0) { + pWriter->pSIter = NULL; + } else { + SRBTreeNode* pNode = tRBTreeMin(&pWriter->rbt); + if (pNode) { + int32_t c = tsdbDataIterCmprFn(&pWriter->pSIter->rbtn, pNode); + if (c > 0) { + tRBTreePut(&pWriter->rbt, &pWriter->pSIter->rbtn); + pWriter->pSIter = NULL; + } else if (c == 0) { + ASSERT(0); + } + } + } + } - code = tsdbReadBlockIdx(pWriter->dReader.pReader, pWriter->dReader.aBlockIdx); - if (code) goto _err; - } else { - ASSERT(pWriter->dReader.pReader == NULL); - taosArrayClear(pWriter->dReader.aBlockIdx); - } - pWriter->dReader.iBlockIdx = 0; // point to the next one - code = tsdbSnapNextTableData(pWriter); - if (code) goto _err; - - // Writer - SHeadFile fHead = {.commitID = pWriter->commitID}; - SDataFile fData = {.commitID = pWriter->commitID}; - SSmaFile fSma = {.commitID = pWriter->commitID}; - SSttFile fStt = {.commitID = pWriter->commitID}; - SDFileSet wSet = {.fid = pWriter->fid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma}; - if (pSet) { - wSet.diskId = pSet->diskId; - fData = *pSet->pDataF; - fSma = *pSet->pSmaF; - for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { - wSet.aSttF[iStt] = pSet->aSttF[iStt]; + if (pWriter->pSIter == NULL) { + SRBTreeNode* pNode = tRBTreeMin(&pWriter->rbt); + if (pNode) { + tRBTreeDrop(&pWriter->rbt, pNode); + pWriter->pSIter = TSDB_RBTN_TO_DATA_ITER(pNode); } - wSet.nSttF = pSet->nSttF + 1; // TODO: fix pSet->nSttF == pTsdb->maxFile - } else { - SDiskID did = {0}; - tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did); - tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did); - wSet.diskId = did; - wSet.nSttF = 1; - } - wSet.aSttF[wSet.nSttF - 1] = &fStt; - - code = tsdbDataFWriterOpen(&pWriter->dWriter.pWriter, pWriter->pTsdb, &wSet); - if (code) goto _err; - taosArrayClear(pWriter->dWriter.aBlockIdx); - tMapDataReset(&pWriter->dWriter.mDataBlk); - taosArrayClear(pWriter->dWriter.aSttBlk); - tBlockDataReset(&pWriter->dWriter.bData); - tBlockDataReset(&pWriter->dWriter.sData); + } - return code; + if (ppRowInfo) { + if (pWriter->pSIter) { + *ppRowInfo = &pWriter->pSIter->rowInfo; + } else { + *ppRowInfo = NULL; + } + } -_err: +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } -static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter) { +static int32_t tsdbSnapWriteGetRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInfo) { int32_t code = 0; + int32_t lino = 0; - ASSERT(pWriter->dWriter.pWriter); - - code = tsdbSnapWriteTableDataEnd(pWriter); - if (code) goto _err; - - // copy remain table data - TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX}; - code = tsdbSnapWriteCopyData(pWriter, &id); - if (code) goto _err; - - code = - tsdbWriteSttBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.sData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg); - if (code) goto _err; - - // Indices - code = tsdbWriteBlockIdx(pWriter->dWriter.pWriter, pWriter->dWriter.aBlockIdx); - if (code) goto _err; - - code = tsdbWriteSttBlk(pWriter->dWriter.pWriter, pWriter->dWriter.aSttBlk); - if (code) goto _err; - - code = tsdbUpdateDFileSetHeader(pWriter->dWriter.pWriter); - if (code) goto _err; - - code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->dWriter.pWriter->wSet); - if (code) goto _err; - - code = tsdbDataFWriterClose(&pWriter->dWriter.pWriter, 1); - if (code) goto _err; - - if (pWriter->dReader.pReader) { - code = tsdbDataFReaderClose(&pWriter->dReader.pReader); - if (code) goto _err; + if (pWriter->pSIter) { + *ppRowInfo = &pWriter->pSIter->rowInfo; + goto _exit; } -_exit: - return code; + code = tsdbSnapWriteNextRow(pWriter, ppRowInfo); + TSDB_CHECK_CODE(code, lino, _exit); -_err: +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } -static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, int8_t* done) { +static int32_t tsdbSnapWriteFileDataEnd(STsdbSnapWriter* pWriter) { int32_t code = 0; + int32_t lino = 0; - SBlockData* pBData = &pWriter->bData; - TABLEID id = {.suid = pBData->suid, .uid = pBData->uid ? pBData->uid : pBData->aUid[iRow]}; - TSDBROW row = tsdbRowFromBlockData(pBData, iRow); - TSDBKEY key = TSDBROW_KEY(&row); + ASSERT(pWriter->pDataFWriter); - *done = 0; - while (pWriter->dReader.iRow < pWriter->dReader.bData.nRow || - pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem) { - // Merge row by row - for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) { - TSDBROW trow = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow); - TSDBKEY tKey = TSDBROW_KEY(&trow); + // consume remain data and end with a NULL table row + SRowInfo* pRowInfo; + code = tsdbSnapWriteGetRow(pWriter, &pRowInfo); + TSDB_CHECK_CODE(code, lino, _exit); + for (;;) { + code = tsdbSnapWriteTableData(pWriter, pRowInfo); + TSDB_CHECK_CODE(code, lino, _exit); - ASSERT(pWriter->dReader.bData.suid == id.suid && pWriter->dReader.bData.uid == id.uid); + if (pRowInfo == NULL) break; - int32_t c = tsdbKeyCmprFn(&key, &tKey); - if (c < 0) { - code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid); - if (code) goto _err; - } else if (c > 0) { - code = tBlockDataAppendRow(&pWriter->dWriter.bData, &trow, NULL, id.uid); - if (code) goto _err; - } else { - ASSERT(0); - } + code = tsdbSnapWriteNextRow(pWriter, &pRowInfo); + TSDB_CHECK_CODE(code, lino, _exit); + } - if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) { - code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk, - pWriter->cmprAlg); - if (code) goto _err; - } + // do file-level updates + code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aSttBlk); + TSDB_CHECK_CODE(code, lino, _exit); - if (c < 0) { - *done = 1; - goto _exit; - } - } + code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdx); + TSDB_CHECK_CODE(code, lino, _exit); - // Merge row by block - SDataBlk tDataBlk = {.minKey = key, .maxKey = key}; - for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) { - SDataBlk dataBlk; - tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk); + code = tsdbUpdateDFileSetHeader(pWriter->pDataFWriter); + TSDB_CHECK_CODE(code, lino, _exit); - int32_t c = tDataBlkCmprFn(&dataBlk, &tDataBlk); - if (c < 0) { - code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk, - pWriter->cmprAlg); - if (code) goto _err; - - code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk); - if (code) goto _err; - } else if (c > 0) { - code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid); - if (code) goto _err; - - if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) { - code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk, - pWriter->cmprAlg); - if (code) goto _err; - } + code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet); + TSDB_CHECK_CODE(code, lino, _exit); - *done = 1; - goto _exit; - } else { - code = tsdbReadDataBlockEx(pWriter->dReader.pReader, &dataBlk, &pWriter->dReader.bData); - if (code) goto _err; - pWriter->dReader.iRow = 0; + code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1); + TSDB_CHECK_CODE(code, lino, _exit); - pWriter->dReader.iDataBlk++; - break; - } - } + if (pWriter->pDataFReader) { + code = tsdbDataFReaderClose(&pWriter->pDataFReader); + TSDB_CHECK_CODE(code, lino, _exit); } -_exit: - return code; + // clear sources + while (pWriter->iterList) { + STsdbDataIter2* pIter = pWriter->iterList; + pWriter->iterList = pIter->next; + tsdbCloseDataIter2(pIter); + } -_err: - tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); + } else { + tsdbDebug("vgId:%d %s is done", TD_VID(pWriter->pTsdb->pVnode), __func__); + } return code; } -static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) { +static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) { int32_t code = 0; + int32_t lino = 0; - TABLEID id = {.suid = pWriter->bData.suid, - .uid = pWriter->bData.uid ? pWriter->bData.uid : pWriter->bData.aUid[iRow]}; - TSDBROW row = tsdbRowFromBlockData(&pWriter->bData, iRow); - SBlockData* pBData = &pWriter->dWriter.sData; + code = tDecmprBlockData(pHdr->data, pHdr->size, &pWriter->inData, pWriter->aBuf); + TSDB_CHECK_CODE(code, lino, _exit); - if (pBData->suid || pBData->uid) { - if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) { - code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, pBData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg); - if (code) goto _err; + ASSERT(pWriter->inData.nRow > 0); - pBData->suid = 0; - pBData->uid = 0; + // switch to new data file if need + int32_t fid = tsdbKeyFid(pWriter->inData.aTSKEY[0], pWriter->minutes, pWriter->precision); + if (pWriter->fid != fid) { + if (pWriter->pDataFWriter) { + code = tsdbSnapWriteFileDataEnd(pWriter); + TSDB_CHECK_CODE(code, lino, _exit); } - } - - if (pBData->suid == 0 && pBData->uid == 0) { - code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pWriter->id.suid, pWriter->id.uid, &pWriter->skmTable); - if (code) goto _err; - TABLEID tid = {.suid = pWriter->id.suid, .uid = pWriter->id.suid ? 0 : pWriter->id.uid}; - code = tBlockDataInit(pBData, &tid, pWriter->skmTable.pTSchema, NULL, 0); - if (code) goto _err; + code = tsdbSnapWriteFileDataStart(pWriter, fid); + TSDB_CHECK_CODE(code, lino, _exit); } - code = tBlockDataAppendRow(pBData, &row, NULL, id.uid); - if (code) goto _err; + // loop write each row + SRowInfo* pRowInfo; + code = tsdbSnapWriteGetRow(pWriter, &pRowInfo); + TSDB_CHECK_CODE(code, lino, _exit); + for (int32_t iRow = 0; iRow < pWriter->inData.nRow; ++iRow) { + SRowInfo rInfo = {.suid = pWriter->inData.suid, + .uid = pWriter->inData.uid ? pWriter->inData.uid : pWriter->inData.aUid[iRow], + .row = tsdbRowFromBlockData(&pWriter->inData, iRow)}; - if (pBData->nRow >= pWriter->maxRow) { - code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, pBData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg); - if (code) goto _err; + for (;;) { + if (pRowInfo == NULL) { + code = tsdbSnapWriteTableData(pWriter, &rInfo); + TSDB_CHECK_CODE(code, lino, _exit); + break; + } else { + int32_t c = tRowInfoCmprFn(&rInfo, pRowInfo); + if (c < 0) { + code = tsdbSnapWriteTableData(pWriter, &rInfo); + TSDB_CHECK_CODE(code, lino, _exit); + break; + } else if (c > 0) { + code = tsdbSnapWriteTableData(pWriter, pRowInfo); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbSnapWriteNextRow(pWriter, &pRowInfo); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + ASSERT(0); + } + } + } } _exit: - return code; - -_err: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(pWriter->pTsdb->pVnode), __func__, + pWriter->inData.suid, pWriter->inData.uid, pWriter->inData.nRow); + } return code; } -static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) { +// SNAP_DATA_DEL +static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) { int32_t code = 0; + int32_t lino = 0; - SBlockData* pBlockData = &pWriter->bData; - TABLEID id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]}; - - // End last table data write if need - if (tTABLEIDCmprFn(&pWriter->id, &id) != 0) { - code = tsdbSnapWriteTableDataEnd(pWriter); - if (code) goto _err; - } - - // Start new table data write if need - if (pWriter->id.suid == 0 && pWriter->id.uid == 0) { - code = tsdbSnapWriteTableDataStart(pWriter, &id); - if (code) goto _err; - } - - // Merge with .data file data - int8_t done = 0; - if (pWriter->dReader.pBlockIdx && tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &id) == 0) { - code = tsdbSnapWriteToDataFile(pWriter, iRow, &done); - if (code) goto _err; - } - - // Append to the .stt data block (todo: check if need to set/reload sst block) - if (!done) { - code = tsdbSnapWriteToSttFile(pWriter, iRow); - if (code) goto _err; + if (pId) { + pWriter->tbid = *pId; + } else { + pWriter->tbid = (TABLEID){.suid = INT64_MAX, .uid = INT64_MAX}; } -_exit: - return code; + taosArrayClear(pWriter->aDelData); -_err: - tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); - return code; -} + if (pWriter->pTIter) { + while (pWriter->pTIter->tIter.iDelIdx < taosArrayGetSize(pWriter->pTIter->tIter.aDelIdx)) { + SDelIdx* pDelIdx = taosArrayGet(pWriter->pTIter->tIter.aDelIdx, pWriter->pTIter->tIter.iDelIdx); -static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - STsdb* pTsdb = pWriter->pTsdb; - SBlockData* pBlockData = &pWriter->bData; + int32_t c = tTABLEIDCmprFn(pDelIdx, &pWriter->tbid); + if (c < 0) { + code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->pTIter->tIter.aDelData); + TSDB_CHECK_CODE(code, lino, _exit); - // Decode data - SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; - code = tDecmprBlockData(pHdr->data, pHdr->size, pBlockData, pWriter->aBuf); - if (code) goto _err; + SDelIdx* pDelIdxNew = taosArrayReserve(pWriter->aDelIdx, 1); + if (pDelIdxNew == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } - ASSERT(pBlockData->nRow > 0); + pDelIdxNew->suid = pDelIdx->suid; + pDelIdxNew->uid = pDelIdx->uid; - // Loop to handle each row - for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { - TSKEY ts = pBlockData->aTSKEY[iRow]; - int32_t fid = tsdbKeyFid(ts, pWriter->minutes, pWriter->precision); + code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->pTIter->tIter.aDelData, pDelIdxNew); + TSDB_CHECK_CODE(code, lino, _exit); - if (pWriter->dWriter.pWriter == NULL || pWriter->fid != fid) { - if (pWriter->dWriter.pWriter) { - // ASSERT(fid > pWriter->fid); + pWriter->pTIter->tIter.iDelIdx++; + } else if (c == 0) { + code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData); + TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbSnapWriteCloseFile(pWriter); - if (code) goto _err; + pWriter->pTIter->tIter.iDelIdx++; + break; + } else { + break; } - - code = tsdbSnapWriteOpenFile(pWriter, fid); - if (code) goto _err; } - - code = tsdbSnapWriteRowData(pWriter, iRow); - if (code) goto _err; } - return code; - -_err: - tsdbError("vgId:%d, vnode snapshot tsdb write data for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, - tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__, pId->suid, + pId->uid); + } return code; } -// SNAP_DATA_DEL -static int32_t tsdbSnapMoveWriteDelData(STsdbSnapWriter* pWriter, TABLEID* pId) { +static int32_t tsdbSnapWriteDelTableDataEnd(STsdbSnapWriter* pWriter) { int32_t code = 0; + int32_t lino = 0; - while (true) { - if (pWriter->iDelIdx >= taosArrayGetSize(pWriter->aDelIdxR)) break; - - SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx); - - if (tTABLEIDCmprFn(pDelIdx, pId) >= 0) break; - - code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData); - if (code) goto _exit; - - SDelIdx delIdx = *pDelIdx; - code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx); - if (code) goto _exit; - - if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) { + if (taosArrayGetSize(pWriter->aDelData) > 0) { + SDelIdx* pDelIdx = taosArrayReserve(pWriter->aDelIdx, 1); + if (pDelIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } - pWriter->iDelIdx++; + pDelIdx->suid = pWriter->tbid.suid; + pDelIdx->uid = pWriter->tbid.uid; + + code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, pDelIdx); + TSDB_CHECK_CODE(code, lino, _exit); } _exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbTrace("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__); + } return code; } -static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { +static int32_t tsdbSnapWriteDelTableData(STsdbSnapWriter* pWriter, TABLEID* pId, uint8_t* pData, int64_t size) { int32_t code = 0; - STsdb* pTsdb = pWriter->pTsdb; - - // Open del file if not opened yet - if (pWriter->pDelFWriter == NULL) { - SDelFile* pDelFile = pWriter->fs.pDelFile; - - // reader - if (pDelFile) { - code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb); - if (code) goto _err; + int32_t lino = 0; - code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR); - if (code) goto _err; - } else { - taosArrayClear(pWriter->aDelIdxR); + if (pId == NULL || pId->uid != pWriter->tbid.uid) { + if (pWriter->tbid.uid) { + code = tsdbSnapWriteDelTableDataEnd(pWriter); + TSDB_CHECK_CODE(code, lino, _exit); } - pWriter->iDelIdx = 0; - // writer - SDelFile delFile = {.commitID = pWriter->commitID}; - code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb); - if (code) goto _err; - taosArrayClear(pWriter->aDelIdxW); + code = tsdbSnapWriteDelTableDataStart(pWriter, pId); + TSDB_CHECK_CODE(code, lino, _exit); } - SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; - TABLEID id = *(TABLEID*)pHdr->data; + if (pId == NULL) goto _exit; - ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData); + int64_t n = 0; + while (n < size) { + SDelData delData; + n += tGetDelData(pData + n, &delData); - // Move write data < id - code = tsdbSnapMoveWriteDelData(pWriter, &id); - if (code) goto _err; + if (taosArrayPush(pWriter->aDelData, &delData) < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + } + ASSERT(n == size); - // Merge incoming data with current - if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR) && - tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) == 0) { - SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} - code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData); - if (code) goto _err; +static int32_t tsdbSnapWriteDelDataStart(STsdbSnapWriter* pWriter) { + int32_t code = 0; + int32_t lino = 0; - pWriter->iDelIdx++; - } else { - taosArrayClear(pWriter->aDelData); - } + STsdb* pTsdb = pWriter->pTsdb; + SDelFile* pDelFile = pWriter->fs.pDelFile; - int64_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID); - while (n < nData) { - SDelData delData; + pWriter->tbid = (TABLEID){0}; - n += tGetDelData(pData + n, &delData); + // reader + if (pDelFile) { + code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); - if (taosArrayPush(pWriter->aDelData, &delData) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + code = tsdbOpenTombFileDataIter(pWriter->pDelFReader, &pWriter->pTIter); + TSDB_CHECK_CODE(code, lino, _exit); } - SDelIdx delIdx = {.suid = id.suid, .uid = id.uid}; - code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx); - if (code) goto _err; + // writer + code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &(SDelFile){.commitID = pWriter->commitID}, pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); - if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) { + if ((pWriter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); + } + if ((pWriter->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); } - return code; - -_err: - tsdbError("vgId:%d, vnode snapshot tsdb write del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, - tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); + } return code; } -static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { +static int32_t tsdbSnapWriteDelDataEnd(STsdbSnapWriter* pWriter) { int32_t code = 0; - STsdb* pTsdb = pWriter->pTsdb; + int32_t lino = 0; - if (pWriter->pDelFWriter == NULL) return code; + STsdb* pTsdb = pWriter->pTsdb; - TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX}; - code = tsdbSnapMoveWriteDelData(pWriter, &id); - if (code) goto _err; + // end remaining table with NULL data + code = tsdbSnapWriteDelTableData(pWriter, NULL, NULL, 0); + TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbWriteDelIdx(pWriter->pDelFWriter, pWriter->aDelIdxW); - if (code) goto _err; + // update file-level info + code = tsdbWriteDelIdx(pWriter->pDelFWriter, pWriter->aDelIdx); + TSDB_CHECK_CODE(code, lino, _exit); code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); code = tsdbFSUpsertDelFile(&pWriter->fs, &pWriter->pDelFWriter->fDel); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); if (pWriter->pDelFReader) { code = tsdbDelFReaderClose(&pWriter->pDelFReader); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (pWriter->pTIter) { + tsdbCloseDataIter2(pWriter->pTIter); + pWriter->pTIter = NULL; } - tsdbInfo("vgId:%d, vnode snapshot tsdb write del for %s end", TD_VID(pTsdb->pVnode), pTsdb->path); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); + } return code; +} -_err: - tsdbError("vgId:%d, vnode snapshot tsdb write del end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, - tstrerror(code)); +static int32_t tsdbSnapWriteDelData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) { + int32_t code = 0; + int32_t lino = 0; + + STsdb* pTsdb = pWriter->pTsdb; + + // start to write del data if need + if (pWriter->pDelFWriter == NULL) { + code = tsdbSnapWriteDelDataStart(pWriter); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // do write del data + code = tsdbSnapWriteDelTableData(pWriter, (TABLEID*)pHdr->data, pHdr->data + sizeof(TABLEID), + pHdr->size - sizeof(TABLEID)); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed since %s", TD_VID(pTsdb->pVnode), __func__, tstrerror(code)); + } else { + tsdbTrace("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); + } return code; } // APIs int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) { - int32_t code = 0; - int32_t lino = 0; - STsdbSnapWriter* pWriter = NULL; + int32_t code = 0; + int32_t lino = 0; // alloc - pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); + STsdbSnapWriter* pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); if (pWriter == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); @@ -1288,11 +1832,6 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr pWriter->pTsdb = pTsdb; pWriter->sver = sver; pWriter->ever = ever; - - code = tsdbFSCopy(pTsdb, &pWriter->fs); - TSDB_CHECK_CODE(code, lino, _exit); - - // config pWriter->minutes = pTsdb->keepCfg.days; pWriter->precision = pTsdb->keepCfg.precision; pWriter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; @@ -1300,96 +1839,62 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; pWriter->commitID = pTsdb->pVnode->state.commitID; + code = tsdbFSCopy(pTsdb, &pWriter->fs); + TSDB_CHECK_CODE(code, lino, _exit); + // SNAP_DATA_TSDB - code = tBlockDataCreate(&pWriter->bData); + code = tBlockDataCreate(&pWriter->inData); TSDB_CHECK_CODE(code, lino, _exit); pWriter->fid = INT32_MIN; - pWriter->id = (TABLEID){0}; - // Reader - pWriter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pWriter->dReader.aBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - code = tBlockDataCreate(&pWriter->dReader.bData); - TSDB_CHECK_CODE(code, lino, _exit); - // Writer - pWriter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pWriter->dWriter.aBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - pWriter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); - if (pWriter->dWriter.aSttBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - code = tBlockDataCreate(&pWriter->dWriter.bData); + code = tBlockDataCreate(&pWriter->bData); TSDB_CHECK_CODE(code, lino, _exit); - code = tBlockDataCreate(&pWriter->dWriter.sData); + + code = tBlockDataCreate(&pWriter->sData); TSDB_CHECK_CODE(code, lino, _exit); // SNAP_DATA_DEL - pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx)); - if (pWriter->aDelIdxR == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - pWriter->aDelData = taosArrayInit(0, sizeof(SDelData)); - if (pWriter->aDelData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - pWriter->aDelIdxW = taosArrayInit(0, sizeof(SDelIdx)); - if (pWriter->aDelIdxW == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } _exit: if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - *ppWriter = NULL; - + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); if (pWriter) { - if (pWriter->aDelIdxW) taosArrayDestroy(pWriter->aDelIdxW); - if (pWriter->aDelData) taosArrayDestroy(pWriter->aDelData); - if (pWriter->aDelIdxR) taosArrayDestroy(pWriter->aDelIdxR); - tBlockDataDestroy(&pWriter->dWriter.sData, 1); - tBlockDataDestroy(&pWriter->dWriter.bData, 1); - if (pWriter->dWriter.aSttBlk) taosArrayDestroy(pWriter->dWriter.aSttBlk); - if (pWriter->dWriter.aBlockIdx) taosArrayDestroy(pWriter->dWriter.aBlockIdx); - tBlockDataDestroy(&pWriter->dReader.bData, 1); - if (pWriter->dReader.aBlockIdx) taosArrayDestroy(pWriter->dReader.aBlockIdx); + tBlockDataDestroy(&pWriter->sData, 1); tBlockDataDestroy(&pWriter->bData, 1); + tBlockDataDestroy(&pWriter->inData, 1); tsdbFSDestroy(&pWriter->fs); - taosMemoryFree(pWriter); + pWriter = NULL; } } else { - tsdbInfo("vgId:%d, %s done", TD_VID(pTsdb->pVnode), __func__); - *ppWriter = pWriter; + tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, sver, ever); } + *ppWriter = pWriter; return code; } int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) { int32_t code = 0; - if (pWriter->dWriter.pWriter) { - code = tsdbSnapWriteCloseFile(pWriter); - if (code) goto _exit; + int32_t lino = 0; + + if (pWriter->pDataFWriter) { + code = tsdbSnapWriteFileDataEnd(pWriter); + TSDB_CHECK_CODE(code, lino, _exit); } - code = tsdbSnapWriteDelEnd(pWriter); - if (code) goto _exit; + if (pWriter->pDelFWriter) { + code = tsdbSnapWriteDelDataEnd(pWriter); + TSDB_CHECK_CODE(code, lino, _exit); + } code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs); - if (code) goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbDebug("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__); } return code; } @@ -1416,26 +1921,17 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { } // SNAP_DATA_DEL - taosArrayDestroy(pWriter->aDelIdxW); taosArrayDestroy(pWriter->aDelData); - taosArrayDestroy(pWriter->aDelIdxR); + taosArrayDestroy(pWriter->aDelIdx); // SNAP_DATA_TSDB - - // Writer - tBlockDataDestroy(&pWriter->dWriter.sData, 1); - tBlockDataDestroy(&pWriter->dWriter.bData, 1); - taosArrayDestroy(pWriter->dWriter.aSttBlk); - tMapDataClear(&pWriter->dWriter.mDataBlk); - taosArrayDestroy(pWriter->dWriter.aBlockIdx); - - // Reader - tBlockDataDestroy(&pWriter->dReader.bData, 1); - tMapDataClear(&pWriter->dReader.mDataBlk); - taosArrayDestroy(pWriter->dReader.aBlockIdx); - + tBlockDataDestroy(&pWriter->sData, 1); tBlockDataDestroy(&pWriter->bData, 1); + taosArrayDestroy(pWriter->aSttBlk); + tMapDataClear(&pWriter->mDataBlk); + taosArrayDestroy(pWriter->aBlockIdx); tDestroyTSchema(pWriter->skmTable.pTSchema); + tBlockDataDestroy(&pWriter->inData, 1); for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) { tFree(pWriter->aBuf[iBuf]); @@ -1453,35 +1949,32 @@ _err: return code; } -int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; +int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) { + int32_t code = 0; + int32_t lino = 0; - // ts data if (pHdr->type == SNAP_DATA_TSDB) { - code = tsdbSnapWriteData(pWriter, pData, nData); - if (code) goto _err; - + code = tsdbSnapWriteTimeSeriesData(pWriter, pHdr); + TSDB_CHECK_CODE(code, lino, _exit); goto _exit; - } else { - if (pWriter->dWriter.pWriter) { - code = tsdbSnapWriteCloseFile(pWriter); - if (code) goto _err; - } + } else if (pWriter->pDataFWriter) { + code = tsdbSnapWriteFileDataEnd(pWriter); + TSDB_CHECK_CODE(code, lino, _exit); } - // del data if (pHdr->type == SNAP_DATA_DEL) { - code = tsdbSnapWriteDel(pWriter, pData, nData); - if (code) goto _err; + code = tsdbSnapWriteDelData(pWriter, pHdr); + TSDB_CHECK_CODE(code, lino, _exit); + goto _exit; } _exit: - tsdbDebug("vgId:%d, tsdb snapshot write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); - return code; - -_err: - tsdbError("vgId:%d, tsdb snapshot write for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path, - tstrerror(code)); + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64, + TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code), pHdr->type, pHdr->index, pHdr->size); + } else { + tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__, + pHdr->type, pHdr->index, pHdr->size); + } return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index ab1096097014bc6f07a9c2f27871bd0d57581445..e2d4b92836d8f6c25cdd4eb5838553fdef167ba5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -684,7 +684,7 @@ int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRo tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal); if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) { uint8_t *pVal = pColVal->value.pData; - + pColVal->value.pData = NULL; code = tRealloc(&pColVal->value.pData, pColVal->value.nData); if (code) goto _exit; @@ -757,7 +757,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { pTColVal->value.nData = pColVal->value.nData; if (pTColVal->value.nData) { - memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData); + memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData); } pTColVal->flag = 0; } else { @@ -776,7 +776,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { code = tRealloc(&tColVal->value.pData, pColVal->value.nData); if (code) return code; - tColVal->value.nData = pColVal->value.nData; + tColVal->value.nData = pColVal->value.nData; if (pColVal->value.nData) { memcpy(tColVal->value.pData, pColVal->value.pData, pColVal->value.nData); } @@ -825,7 +825,7 @@ int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) { uint8_t *pVal = pColVal->value.pData; - + pColVal->value.pData = NULL; code = tRealloc(&pColVal->value.pData, pColVal->value.nData); if (code) goto _exit; @@ -834,7 +834,7 @@ int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { memcpy(pColVal->value.pData, pVal, pColVal->value.nData); } } - + if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; @@ -845,7 +845,7 @@ _exit: return code; } -void tRowMergerClear(SRowMerger *pMerger) { +void tRowMergerClear(SRowMerger *pMerger) { for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) { SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol); if (IS_VAR_DATA_TYPE(pTColVal->type)) { @@ -853,7 +853,7 @@ void tRowMergerClear(SRowMerger *pMerger) { } } - taosArrayDestroy(pMerger->pArray); + taosArrayDestroy(pMerger->pArray); } int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { @@ -876,7 +876,7 @@ int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { pTColVal->value.nData = pColVal->value.nData; if (pTColVal->value.nData) { - memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData); + memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData); } pTColVal->flag = 0; } else { @@ -898,7 +898,7 @@ int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { tColVal->value.nData = pColVal->value.nData; if (tColVal->value.nData) { - memcpy(tColVal->value.pData, pColVal->value.pData, tColVal->value.nData); + memcpy(tColVal->value.pData, pColVal->value.pData, tColVal->value.nData); } tColVal->flag = 0; } else { diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index e75dc24329c03ade45e12242fd70a62b963f74ff..43f903dc4867178919e3d3b519b899afcca6d835 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -455,7 +455,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { if (code) goto _err; } - code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData); + code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pHdr); if (code) goto _err; } break; case SNAP_DATA_TQ_HANDLE: {