未验证 提交 856b50eb 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16746 from taosdata/refact/tsdb_new_snapshot

refact: tsdb new snapshot
...@@ -84,6 +84,8 @@ typedef struct SLDataIter SLDataIter; ...@@ -84,6 +84,8 @@ typedef struct SLDataIter SLDataIter;
#define TSDBKEY_MIN ((TSDBKEY){.ts = TSKEY_MIN, .version = VERSION_MIN}) #define TSDBKEY_MIN ((TSDBKEY){.ts = TSKEY_MIN, .version = VERSION_MIN})
#define TSDBKEY_MAX ((TSDBKEY){.ts = TSKEY_MAX, .version = VERSION_MAX}) #define TSDBKEY_MAX ((TSDBKEY){.ts = TSKEY_MAX, .version = VERSION_MAX})
#define TABLE_SAME_SCHEMA(SUID1, UID1, SUID2, UID2) ((SUID1) ? (SUID1) == (SUID2) : (UID1) == (UID2))
#define PAGE_CONTENT_SIZE(PAGE) ((PAGE) - sizeof(TSCKSUM)) #define PAGE_CONTENT_SIZE(PAGE) ((PAGE) - sizeof(TSCKSUM))
#define LOGIC_TO_FILE_OFFSET(LOFFSET, PAGE) \ #define LOGIC_TO_FILE_OFFSET(LOFFSET, PAGE) \
((LOFFSET) / PAGE_CONTENT_SIZE(PAGE) * (PAGE) + (LOFFSET) % PAGE_CONTENT_SIZE(PAGE)) ((LOFFSET) / PAGE_CONTENT_SIZE(PAGE) * (PAGE) + (LOFFSET) % PAGE_CONTENT_SIZE(PAGE))
...@@ -262,7 +264,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS ...@@ -262,7 +264,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync); int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync);
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter); int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter);
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx); int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx);
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, SBlockIdx *pBlockIdx); int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *pBlockIdx);
int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk); int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
int8_t cmprAlg, int8_t toLast); int8_t cmprAlg, int8_t toLast);
...@@ -272,7 +274,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); ...@@ -272,7 +274,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFReaderClose(SDataFReader **ppReader); int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx); int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData); int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk);
int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk); int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk);
int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg); int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg);
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData); int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData);
...@@ -658,21 +660,27 @@ typedef struct SMergeTree { ...@@ -658,21 +660,27 @@ typedef struct SMergeTree {
SArray *pIterList; SArray *pIterList;
SLDataIter *pIter; SLDataIter *pIter;
bool destroyLoadInfo; bool destroyLoadInfo;
SSttBlockLoadInfo* pLoadInfo; SSttBlockLoadInfo *pLoadInfo;
const char *idStr; const char *idStr;
} SMergeTree; } SMergeTree;
typedef struct {
int64_t suid;
int64_t uid;
STSchema *pTSchema;
} SSkmInfo;
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pLoadInfo, const char* idStr); STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo, const char *idStr);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree); bool tMergeTreeNext(SMergeTree *pMTree);
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree); TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo* tCreateLastBlockLoadInfo(); SSttBlockLoadInfo *tCreateLastBlockLoadInfo();
void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo); void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void getLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo, int64_t* blocks, double* el); void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el);
void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo); void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
// ========== inline functions ========== // ========== inline functions ==========
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
......
...@@ -104,7 +104,7 @@ int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* ...@@ -104,7 +104,7 @@ int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq*
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t *tbUid); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid);
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
......
...@@ -589,7 +589,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { ...@@ -589,7 +589,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
} }
tMapDataReset(&state->blockMap); tMapDataReset(&state->blockMap);
code = tsdbReadBlock(state->pDataFReader, state->pBlockIdx, &state->blockMap); code = tsdbReadDataBlk(state->pDataFReader, state->pBlockIdx, &state->blockMap);
if (code) goto _err; if (code) goto _err;
state->nBlock = state->blockMap.nItem; state->nBlock = state->blockMap.nItem;
......
...@@ -14,13 +14,8 @@ ...@@ -14,13 +14,8 @@
*/ */
#include "tsdb.h" #include "tsdb.h"
typedef struct {
int64_t suid;
int64_t uid;
STSchema *pTSchema;
} SSkmInfo;
typedef enum { MEMORY_DATA_ITER = 0, LAST_DATA_ITER } EDataIterT; typedef enum { MEMORY_DATA_ITER = 0, STT_DATA_ITER } EDataIterT;
typedef struct { typedef struct {
SRBTreeNode n; SRBTreeNode n;
...@@ -99,7 +94,7 @@ static int32_t tsdbCommitCache(SCommitter *pCommitter); ...@@ -99,7 +94,7 @@ static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno); static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
static int32_t tsdbNextCommitRow(SCommitter *pCommitter); static int32_t tsdbNextCommitRow(SCommitter *pCommitter);
static int32_t tRowInfoCmprFn(const void *p1, const void *p2) { int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
SRowInfo *pInfo1 = (SRowInfo *)p1; SRowInfo *pInfo1 = (SRowInfo *)p1;
SRowInfo *pInfo2 = (SRowInfo *)p2; SRowInfo *pInfo2 = (SRowInfo *)p2;
...@@ -325,22 +320,22 @@ _err: ...@@ -325,22 +320,22 @@ _err:
return code; return code;
} }
static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid) { int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo) {
int32_t code = 0; int32_t code = 0;
if (suid) { if (suid) {
if (pCommitter->skmTable.suid == suid) { if (pSkmInfo->suid == suid) {
pCommitter->skmTable.uid = uid; pSkmInfo->uid = uid;
goto _exit; goto _exit;
} }
} else { } else {
if (pCommitter->skmTable.uid == uid) goto _exit; if (pSkmInfo->uid == uid) goto _exit;
} }
pCommitter->skmTable.suid = suid; pSkmInfo->suid = suid;
pCommitter->skmTable.uid = uid; pSkmInfo->uid = uid;
tTSchemaDestroy(pCommitter->skmTable.pTSchema); tTSchemaDestroy(pSkmInfo->pTSchema);
code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, -1, &pCommitter->skmTable.pTSchema); code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema);
if (code) goto _exit; if (code) goto _exit;
_exit: _exit:
...@@ -382,7 +377,7 @@ static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) { ...@@ -382,7 +377,7 @@ static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) {
pCommitter->dReader.pBlockIdx = pCommitter->dReader.pBlockIdx =
(SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx); (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);
code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
if (code) goto _exit; if (code) goto _exit;
ASSERT(pCommitter->dReader.mBlock.nItem > 0); ASSERT(pCommitter->dReader.mBlock.nItem > 0);
...@@ -432,7 +427,7 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { ...@@ -432,7 +427,7 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
int8_t iIter = 0; int8_t iIter = 0;
for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) { for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
pIter = &pCommitter->aDataIter[iIter]; pIter = &pCommitter->aDataIter[iIter];
pIter->type = LAST_DATA_ITER; pIter->type = STT_DATA_ITER;
pIter->iStt = iStt; pIter->iStt = iStt;
code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk); code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk);
...@@ -498,7 +493,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -498,7 +493,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
pCommitter->dReader.iBlockIdx = 0; pCommitter->dReader.iBlockIdx = 0;
if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) { if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) {
pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0); pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0);
code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
if (code) goto _err; if (code) goto _err;
} else { } else {
pCommitter->dReader.pBlockIdx = NULL; pCommitter->dReader.pBlockIdx = NULL;
...@@ -556,46 +551,45 @@ _err: ...@@ -556,46 +551,45 @@ _err:
return code; return code;
} }
static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
SDataBlk block;
ASSERT(pBlockData->nRow > 0); if (pBlockData->nRow == 0) return code;
tDataBlkReset(&block); SDataBlk dataBlk;
tDataBlkReset(&dataBlk);
// info // info
block.nRow += pBlockData->nRow; dataBlk.nRow += pBlockData->nRow;
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]}; TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]};
if (iRow == 0) { if (iRow == 0) {
if (tsdbKeyCmprFn(&block.minKey, &key) > 0) { if (tsdbKeyCmprFn(&dataBlk.minKey, &key) > 0) {
block.minKey = key; dataBlk.minKey = key;
} }
} else { } else {
if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) { if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) {
block.hasDup = 1; dataBlk.hasDup = 1;
} }
} }
if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&block.maxKey, &key) < 0) { if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&dataBlk.maxKey, &key) < 0) {
block.maxKey = key; dataBlk.maxKey = key;
} }
block.minVer = TMIN(block.minVer, key.version); dataBlk.minVer = TMIN(dataBlk.minVer, key.version);
block.maxVer = TMAX(block.maxVer, key.version); dataBlk.maxVer = TMAX(dataBlk.maxVer, key.version);
} }
// write // write
block.nSubBlock++; dataBlk.nSubBlock++;
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &block.aSubBlock[block.nSubBlock - 1], code = tsdbWriteBlockData(pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1],
((block.nSubBlock == 1) && !block.hasDup) ? &block.smaInfo : NULL, pCommitter->cmprAlg, 0); ((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL, cmprAlg, 0);
if (code) goto _err; if (code) goto _err;
// put SDataBlk // put SDataBlk
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutDataBlk); code = tMapDataPutItem(mDataBlk, &dataBlk, tPutDataBlk);
if (code) goto _err; if (code) goto _err;
// clear // clear
...@@ -604,39 +598,38 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { ...@@ -604,39 +598,38 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) {
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb commit data block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb commit data block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
SSttBlk blockL; SSttBlk sstBlk;
SBlockData *pBlockData = &pCommitter->dWriter.bDatal;
ASSERT(pBlockData->nRow > 0); if (pBlockData->nRow == 0) return code;
// info // info
blockL.suid = pBlockData->suid; sstBlk.suid = pBlockData->suid;
blockL.nRow = pBlockData->nRow; sstBlk.nRow = pBlockData->nRow;
blockL.minKey = TSKEY_MAX; sstBlk.minKey = TSKEY_MAX;
blockL.maxKey = TSKEY_MIN; sstBlk.maxKey = TSKEY_MIN;
blockL.minVer = VERSION_MAX; sstBlk.minVer = VERSION_MAX;
blockL.maxVer = VERSION_MIN; sstBlk.maxVer = VERSION_MIN;
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
blockL.minKey = TMIN(blockL.minKey, pBlockData->aTSKEY[iRow]); sstBlk.minKey = TMIN(sstBlk.minKey, pBlockData->aTSKEY[iRow]);
blockL.maxKey = TMAX(blockL.maxKey, pBlockData->aTSKEY[iRow]); sstBlk.maxKey = TMAX(sstBlk.maxKey, pBlockData->aTSKEY[iRow]);
blockL.minVer = TMIN(blockL.minVer, pBlockData->aVersion[iRow]); sstBlk.minVer = TMIN(sstBlk.minVer, pBlockData->aVersion[iRow]);
blockL.maxVer = TMAX(blockL.maxVer, pBlockData->aVersion[iRow]); sstBlk.maxVer = TMAX(sstBlk.maxVer, pBlockData->aVersion[iRow]);
} }
blockL.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0]; sstBlk.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0];
blockL.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1]; sstBlk.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
// write // write
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &blockL.bInfo, NULL, pCommitter->cmprAlg, 1); code = tsdbWriteBlockData(pWriter, pBlockData, &sstBlk.bInfo, NULL, cmprAlg, 1);
if (code) goto _err; if (code) goto _err;
// push SSttBlk // push SSttBlk
if (taosArrayPush(pCommitter->dWriter.aSttBlk, &blockL) == NULL) { if (taosArrayPush(aSttBlk, &sstBlk) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
...@@ -647,7 +640,7 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { ...@@ -647,7 +640,7 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb commit last block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb commit last block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
...@@ -692,7 +685,7 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { ...@@ -692,7 +685,7 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) { while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) {
SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx; SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx;
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx); code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx);
if (code) goto _err; if (code) goto _err;
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
...@@ -1046,7 +1039,7 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { ...@@ -1046,7 +1039,7 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
break; break;
} }
} }
} else if (pCommitter->pIter->type == LAST_DATA_ITER) { // last file } else if (pCommitter->pIter->type == STT_DATA_ITER) { // last file
pIter->iRow++; pIter->iRow++;
if (pIter->iRow < pIter->bData.nRow) { if (pIter->iRow < pIter->bData.nRow) {
pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
...@@ -1124,15 +1117,14 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) ...@@ -1124,15 +1117,14 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
} }
if (pBlockData->nRow >= pCommitter->maxRow) { if (pBlockData->nRow >= pCommitter->maxRow) {
code = tsdbCommitDataBlock(pCommitter); code =
tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} }
} }
if (pBlockData->nRow) { code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err; if (code) goto _err;
}
return code; return code;
...@@ -1193,7 +1185,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) ...@@ -1193,7 +1185,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
} }
if (pBDataW->nRow >= pCommitter->maxRow) { if (pBDataW->nRow >= pCommitter->maxRow) {
code = tsdbCommitDataBlock(pCommitter); code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} }
} }
...@@ -1210,15 +1202,13 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) ...@@ -1210,15 +1202,13 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
} }
if (pBDataW->nRow >= pCommitter->maxRow) { if (pBDataW->nRow >= pCommitter->maxRow) {
code = tsdbCommitDataBlock(pCommitter); code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} }
} }
if (pBDataW->nRow) { code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err; if (code) goto _err;
}
return code; return code;
...@@ -1306,10 +1296,8 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) { ...@@ -1306,10 +1296,8 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) {
SBlockData *pBDatal = &pCommitter->dWriter.bDatal; SBlockData *pBDatal = &pCommitter->dWriter.bDatal;
if (pBDatal->suid || pBDatal->uid) { if (pBDatal->suid || pBDatal->uid) {
if ((pBDatal->suid != id.suid) || (id.suid == 0)) { if ((pBDatal->suid != id.suid) || (id.suid == 0)) {
if (pBDatal->nRow) { code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _exit; if (code) goto _exit;
}
tBlockDataReset(pBDatal); tBlockDataReset(pBDatal);
} }
} }
...@@ -1341,7 +1329,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { ...@@ -1341,7 +1329,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
if (code) goto _err; if (code) goto _err;
if (pBDatal->nRow >= pCommitter->maxRow) { if (pBDatal->nRow >= pCommitter->maxRow) {
code = tsdbCommitLastBlock(pCommitter); code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} }
} }
...@@ -1393,10 +1381,11 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1393,10 +1381,11 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
if (pBData->nRow >= pCommitter->maxRow) { if (pBData->nRow >= pCommitter->maxRow) {
if (pCommitter->toLastOnly) { if (pCommitter->toLastOnly) {
code = tsdbCommitLastBlock(pCommitter); code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} else { } else {
code = tsdbCommitDataBlock(pCommitter); code =
tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} }
} }
...@@ -1404,7 +1393,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1404,7 +1393,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
if (!pCommitter->toLastOnly && pBData->nRow) { if (!pCommitter->toLastOnly && pBData->nRow) {
if (pBData->nRow > pCommitter->minRow) { if (pBData->nRow > pCommitter->minRow) {
code = tsdbCommitDataBlock(pCommitter); code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} else { } else {
code = tsdbAppendLastBlock(pCommitter); code = tsdbAppendLastBlock(pCommitter);
...@@ -1437,7 +1426,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { ...@@ -1437,7 +1426,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
tMapDataReset(&pCommitter->dWriter.mBlock); tMapDataReset(&pCommitter->dWriter.mBlock);
// impl // impl
code = tsdbCommitterUpdateTableSchema(pCommitter, id.suid, id.uid); code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable);
if (code) goto _err; if (code) goto _err;
code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema); code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema);
if (code) goto _err; if (code) goto _err;
...@@ -1455,7 +1444,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { ...@@ -1455,7 +1444,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
// end // end
if (pCommitter->dWriter.mBlock.nItem > 0) { if (pCommitter->dWriter.mBlock.nItem > 0) {
SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid}; SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid};
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx); code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx);
if (code) goto _err; if (code) goto _err;
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
...@@ -1470,10 +1459,9 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { ...@@ -1470,10 +1459,9 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
code = tsdbMoveCommitData(pCommitter, id); code = tsdbMoveCommitData(pCommitter, id);
if (code) goto _err; if (code) goto _err;
if (pCommitter->dWriter.bDatal.nRow > 0) { code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
code = tsdbCommitLastBlock(pCommitter); pCommitter->cmprAlg);
if (code) goto _err; if (code) goto _err;
}
return code; return code;
......
...@@ -231,10 +231,10 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK ...@@ -231,10 +231,10 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid}; STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
if (ASCENDING_TRAVERSE(pTsdbReader->order)) { if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
int64_t skey = pTsdbReader->window.skey; int64_t skey = pTsdbReader->window.skey;
info.lastKey = (skey > INT64_MIN)? (skey - 1):skey; info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
} else { } else {
int64_t ekey = pTsdbReader->window.ekey; int64_t ekey = pTsdbReader->window.ekey;
info.lastKey = (ekey < INT64_MAX)? (ekey + 1):ekey; info.lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
} }
taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
...@@ -601,7 +601,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN ...@@ -601,7 +601,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t)); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
tMapDataReset(&pScanInfo->mapData); tMapDataReset(&pScanInfo->mapData);
tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
sizeInDisk += pScanInfo->mapData.nData; sizeInDisk += pScanInfo->mapData.nData;
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
...@@ -1933,7 +1933,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan ...@@ -1933,7 +1933,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
initMemDataIterator(pScanInfo, pReader); initMemDataIterator(pScanInfo, pReader);
pLBlockReader->uid = pScanInfo->uid; pLBlockReader->uid = pScanInfo->uid;
int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order)? 1:-1; int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
STimeWindow w = pLBlockReader->window; STimeWindow w = pLBlockReader->window;
if (ASCENDING_TRAVERSE(pLBlockReader->order)) { if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
w.skey = pScanInfo->lastKey + step; w.skey = pScanInfo->lastKey + step;
...@@ -3621,7 +3621,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { ...@@ -3621,7 +3621,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order); resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
int64_t ts = ASCENDING_TRAVERSE(pReader->order)?pReader->window.skey-1:pReader->window.ekey+1; int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
resetDataBlockScanInfo(pReader->status.pTableMap, ts); resetDataBlockScanInfo(pReader->status.pTableMap, ts);
int32_t code = 0; int32_t code = 0;
......
...@@ -418,21 +418,21 @@ _err: ...@@ -418,21 +418,21 @@ _err:
return code; return code;
} }
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBlockIdx) { int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *pBlockIdx) {
int32_t code = 0; int32_t code = 0;
SHeadFile *pHeadFile = &pWriter->fHead; SHeadFile *pHeadFile = &pWriter->fHead;
int64_t size; int64_t size;
int64_t n; int64_t n;
ASSERT(mBlock->nItem > 0); ASSERT(mDataBlk->nItem > 0);
// alloc // alloc
size = tPutMapData(NULL, mBlock); size = tPutMapData(NULL, mDataBlk);
code = tRealloc(&pWriter->aBuf[0], size); code = tRealloc(&pWriter->aBuf[0], size);
if (code) goto _err; if (code) goto _err;
// build // build
n = tPutMapData(pWriter->aBuf[0], mBlock); n = tPutMapData(pWriter->aBuf[0], mDataBlk);
// write // write
code = tsdbWriteFile(pWriter->pHeadFD, pHeadFile->size, pWriter->aBuf[0], size); code = tsdbWriteFile(pWriter->pHeadFD, pHeadFile->size, pWriter->aBuf[0], size);
...@@ -446,7 +446,7 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBloc ...@@ -446,7 +446,7 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBloc
tsdbTrace("vgId:%d, write block, file ID:%d commit ID:%d suid:%" PRId64 " uid:%" PRId64 " offset:%" PRId64 tsdbTrace("vgId:%d, write block, file ID:%d commit ID:%d suid:%" PRId64 " uid:%" PRId64 " offset:%" PRId64
" size:%" PRId64 " nItem:%d", " size:%" PRId64 " nItem:%d",
TD_VID(pWriter->pTsdb->pVnode), pWriter->wSet.fid, pHeadFile->commitID, pBlockIdx->suid, pBlockIdx->uid, TD_VID(pWriter->pTsdb->pVnode), pWriter->wSet.fid, pHeadFile->commitID, pBlockIdx->suid, pBlockIdx->uid,
pBlockIdx->offset, pBlockIdx->size, mBlock->nItem); pBlockIdx->offset, pBlockIdx->size, mDataBlk->nItem);
return code; return code;
_err: _err:
...@@ -872,7 +872,7 @@ _err: ...@@ -872,7 +872,7 @@ _err:
return code; return code;
} }
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock) { int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk) {
int32_t code = 0; int32_t code = 0;
int64_t offset = pBlockIdx->offset; int64_t offset = pBlockIdx->offset;
int64_t size = pBlockIdx->size; int64_t size = pBlockIdx->size;
...@@ -886,7 +886,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl ...@@ -886,7 +886,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
if (code) goto _err; if (code) goto _err;
// decode // decode
int64_t n = tGetMapData(pReader->aBuf[0], mBlock); int64_t n = tGetMapData(pReader->aBuf[0], mDataBlk);
if (n < 0) { if (n < 0) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
...@@ -1053,6 +1053,29 @@ _err: ...@@ -1053,6 +1053,29 @@ _err:
return code; return code;
} }
int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) {
int32_t code = 0;
SBlockInfo *pBlockInfo = &pDataBlk->aSubBlock[0];
// alloc
code = tRealloc(&pReader->aBuf[0], pBlockInfo->szBlock);
if (code) goto _err;
// read
code = tsdbReadFile(pReader->pDataFD, pBlockInfo->offset, pReader->aBuf[0], pBlockInfo->szBlock);
if (code) goto _err;
// decmpr
code = tDecmprBlockData(pReader->aBuf[0], pBlockInfo->szBlock, pBlockData, &pReader->aBuf[1]);
if (code) goto _err;
return code;
_err:
tsdbError("vgId:%d tsdb read data block ex failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) { int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) {
int32_t code = 0; int32_t code = 0;
...@@ -1147,8 +1170,8 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb ...@@ -1147,8 +1170,8 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb
pDelFWriter->fDel = *pFile; pDelFWriter->fDel = *pFile;
tsdbDelFileName(pTsdb, pFile, fname); tsdbDelFileName(pTsdb, pFile, fname);
code = int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE;
tsdbOpenFile(fname, TSDB_DEFAULT_PAGE_SIZE, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, &pDelFWriter->pWriteH); code = tsdbOpenFile(fname, TSDB_DEFAULT_PAGE_SIZE, flag, &pDelFWriter->pWriteH);
if (code) goto _err; if (code) goto _err;
// update header // update header
......
...@@ -16,6 +16,29 @@ ...@@ -16,6 +16,29 @@
#include "tsdb.h" #include "tsdb.h"
// STsdbSnapReader ======================================== // STsdbSnapReader ========================================
typedef enum { SNAP_DATA_FILE_ITER = 0, SNAP_STT_FILE_ITER } EFIterT;
typedef struct {
SRBTreeNode n;
SRowInfo rInfo;
EFIterT type;
union {
struct {
SArray* aBlockIdx;
int32_t iBlockIdx;
SBlockIdx* pBlockIdx;
SMapData mBlock;
int32_t iBlock;
}; // .data file
struct {
int32_t iStt;
SArray* aSttBlk;
int32_t iSttBlk;
}; // .stt file
};
SBlockData bData;
int32_t iRow;
} SFDataIter;
struct STsdbSnapReader { struct STsdbSnapReader {
STsdb* pTsdb; STsdb* pTsdb;
int64_t sver; int64_t sver;
...@@ -26,146 +49,301 @@ struct STsdbSnapReader { ...@@ -26,146 +49,301 @@ struct STsdbSnapReader {
int8_t dataDone; int8_t dataDone;
int32_t fid; int32_t fid;
SDataFReader* pDataFReader; SDataFReader* pDataFReader;
SArray* aBlockIdx; // SArray<SBlockIdx> SFDataIter* pIter;
SArray* aSstBlk; // SArray<SSttBlk> SRBTree rbt;
SBlockIdx* pBlockIdx; SFDataIter aFDataIter[TSDB_MAX_STT_FILE + 1];
SSttBlk* pSstBlk; SBlockData bData;
SSkmInfo skmTable;
int32_t iBlockIdx;
int32_t iBlockL;
SMapData mBlock; // SMapData<SDataBlk>
int32_t iBlock;
SBlockData oBlockData;
SBlockData nBlockData;
// for del file // for del file
int8_t delDone; int8_t delDone;
SDelFReader* pDelFReader; SDelFReader* pDelFReader;
SArray* aDelIdx; // SArray<SDelIdx> SArray* aDelIdx; // SArray<SDelIdx>
int32_t iDelIdx; int32_t iDelIdx;
SArray* aDelData; // SArray<SDelData> SArray* aDelData; // SArray<SDelData>
uint8_t* aBuf[5];
}; };
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { extern int32_t tRowInfoCmprFn(const void* p1, const void* p2);
extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData);
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pReader->pTsdb;
while (true) {
if (pReader->pDataFReader == NULL) {
// next
SDFileSet dFileSet = {.fid = pReader->fid}; SDFileSet dFileSet = {.fid = pReader->fid};
SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT); SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT);
if (pSet == NULL) goto _exit; if (pSet == NULL) return code;
pReader->fid = pSet->fid;
// load pReader->fid = pSet->fid;
code = tsdbDataFReaderOpen(&pReader->pDataFReader, pTsdb, pSet); code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
if (code) goto _err; if (code) goto _err;
code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx); pReader->pIter = NULL;
tRBTreeCreate(&pReader->rbt, tRowInfoCmprFn);
// .data file
SFDataIter* pIter = &pReader->aFDataIter[0];
pIter->type = SNAP_DATA_FILE_ITER;
code = tsdbReadBlockIdx(pReader->pDataFReader, pIter->aBlockIdx);
if (code) goto _err; if (code) goto _err;
code = tsdbReadSttBlk(pReader->pDataFReader, 0, pReader->aSstBlk); for (pIter->iBlockIdx = 0; pIter->iBlockIdx < taosArrayGetSize(pIter->aBlockIdx); pIter->iBlockIdx++) {
pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx);
code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock);
if (code) goto _err; if (code) goto _err;
// init for (pIter->iBlock = 0; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) {
pReader->iBlockIdx = 0; SDataBlk dataBlk;
if (pReader->iBlockIdx < taosArrayGetSize(pReader->aBlockIdx)) { tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk);
pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx);
if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue;
code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock); code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData);
if (code) goto _err; if (code) goto _err;
pReader->iBlock = 0; ASSERT(pIter->pBlockIdx->suid == pIter->bData.suid);
} else { ASSERT(pIter->pBlockIdx->uid == pIter->bData.uid);
pReader->pBlockIdx = NULL;
for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
int64_t rowVer = pIter->bData.aVersion[pIter->iRow];
if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
pIter->rInfo.suid = pIter->pBlockIdx->suid;
pIter->rInfo.uid = pIter->pBlockIdx->uid;
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
goto _add_iter_and_break;
}
}
} }
pReader->iBlockL = 0; continue;
while (true) {
if (pReader->iBlockL >= taosArrayGetSize(pReader->aSstBlk)) { _add_iter_and_break:
pReader->pSstBlk = NULL; tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter);
break; break;
} }
pReader->pSstBlk = (SSttBlk*)taosArrayGet(pReader->aSstBlk, pReader->iBlockL); // .stt file
if (pReader->pSstBlk->minVer <= pReader->ever && pReader->pSstBlk->maxVer >= pReader->sver) { pIter = &pReader->aFDataIter[1];
// TODO for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
break; pIter->type = SNAP_STT_FILE_ITER;
pIter->iStt = iStt;
code = tsdbReadSttBlk(pReader->pDataFReader, iStt, pIter->aSttBlk);
if (code) goto _err;
for (pIter->iSttBlk = 0; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) {
SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
if (pSttBlk->minVer > pReader->ever) continue;
if (pSttBlk->maxVer < pReader->sver) continue;
code = tsdbReadSttBlock(pReader->pDataFReader, iStt, pSttBlk, &pIter->bData);
if (code) goto _err;
for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
int64_t rowVer = pIter->bData.aVersion[pIter->iRow];
if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
pIter->rInfo.suid = pIter->bData.suid;
pIter->rInfo.uid = pIter->bData.uid;
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
goto _add_iter;
} }
}
}
continue;
pReader->iBlockL++; _add_iter:
tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter);
pIter++;
} }
tsdbInfo("vgId:%d, vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pTsdb->pVnode), pTsdb->path, tsdbInfo("vgId:%d, vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pReader->pTsdb->pVnode),
pReader->fid); pReader->pTsdb->path, pReader->fid);
return code;
_err:
tsdbError("vgId:%d vnode snapshot tsdb snap read open file failed since %s", TD_VID(pReader->pTsdb->pVnode),
tstrerror(code));
return code;
}
static SRowInfo* tsdbSnapGetRow(STsdbSnapReader* pReader) { return pReader->pIter ? &pReader->pIter->rInfo : NULL; }
static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) {
int32_t code = 0;
if (pReader->pIter) {
SFDataIter* pIter = pReader->pIter;
while (true) {
_find_row:
for (pIter->iRow++; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
int64_t rowVer = pIter->bData.aVersion[pIter->iRow];
if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
pIter->rInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
goto _out;
}
} }
if (pIter->type == SNAP_DATA_FILE_ITER) {
while (true) { while (true) {
if (pReader->pBlockIdx && pReader->pSstBlk) { for (pIter->iBlock++; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) {
TABLEID id = {.suid = pReader->pSstBlk->suid, .uid = pReader->pSstBlk->minUid}; SDataBlk dataBlk;
tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk);
ASSERT(0); if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue;
// if (tTABLEIDCmprFn(pReader->pBlockIdx, &minId) < 0) { code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData);
// // TODO if (code) goto _err;
// } else if (tTABLEIDCmprFn(pReader->pBlockIdx, &maxId) < 0) {
// // TODO
// } else {
// // TODO
// }
} else if (pReader->pBlockIdx) {
while (pReader->iBlock < pReader->mBlock.nItem) {
SDataBlk block;
tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, &block, tGetDataBlk);
if (block.minVer <= pReader->ever && block.maxVer >= pReader->sver) { pIter->iRow = -1;
// load data (todo) goto _find_row;
} }
// next pIter->iBlockIdx++;
pReader->iBlock++; if (pIter->iBlockIdx >= taosArrayGetSize(pIter->aBlockIdx)) break;
if (*ppData) break;
pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx);
code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock);
if (code) goto _err;
pIter->iBlock = -1;
} }
if (pReader->iBlock >= pReader->mBlock.nItem) { pReader->pIter = NULL;
pReader->iBlockIdx++; } else if (pIter->type == SNAP_STT_FILE_ITER) {
if (pReader->iBlockIdx < taosArrayGetSize(pReader->aBlockIdx)) { for (pIter->iSttBlk++; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) {
pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx); SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
if (pSttBlk->minVer > pReader->ever || pSttBlk->maxVer < pReader->sver) continue;
code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock); code = tsdbReadSttBlock(pReader->pDataFReader, pIter->iStt, pSttBlk, &pIter->bData);
if (code) goto _err; if (code) goto _err;
pReader->iBlock = 0; pIter->iRow = -1;
goto _find_row;
}
pReader->pIter = NULL;
} else { } else {
pReader->pBlockIdx = NULL; ASSERT(0);
} }
} }
if (*ppData) goto _exit; _out:
} else if (pReader->pSstBlk) { pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt);
while (pReader->pSstBlk) { if (pReader->pIter && pIter) {
if (pReader->pSstBlk->minVer <= pReader->ever && pReader->pSstBlk->maxVer >= pReader->sver) { int32_t c = tRowInfoCmprFn(&pReader->pIter->rInfo, &pIter->rInfo);
// load data (todo) if (c > 0) {
tRBTreePut(&pReader->rbt, (SRBTreeNode*)pReader->pIter);
pReader->pIter = NULL;
} else {
ASSERT(c);
}
}
} }
// next if (pReader->pIter == NULL) {
pReader->iBlockL++; pReader->pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt);
if (pReader->iBlockL < taosArrayGetSize(pReader->aSstBlk)) { if (pReader->pIter) {
pReader->pSstBlk = (SSttBlk*)taosArrayGetSize(pReader->aSstBlk); tRBTreeDrop(&pReader->rbt, (SRBTreeNode*)pReader->pIter);
} else { }
pReader->pSstBlk = NULL;
} }
if (*ppData) goto _exit; return code;
_err:
return code;
}
static int32_t tsdbSnapCmprData(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
ASSERT(pReader->bData.nRow);
int32_t aBufN[5] = {0};
code = tCmprBlockData(&pReader->bData, TWO_STAGE_COMP, NULL, NULL, pReader->aBuf, aBufN);
if (code) goto _exit;
int32_t size = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
} }
} else {
SSnapDataHdr* pHdr = (SSnapDataHdr*)*ppData;
pHdr->type = SNAP_DATA_TSDB;
pHdr->size = size;
memcpy(pHdr->data, pReader->aBuf[3], aBufN[3]);
memcpy(pHdr->data + aBufN[3], pReader->aBuf[2], aBufN[2]);
if (aBufN[1]) {
memcpy(pHdr->data + aBufN[3] + aBufN[2], pReader->aBuf[1], aBufN[1]);
}
if (aBufN[0]) {
memcpy(pHdr->data + aBufN[3] + aBufN[2] + aBufN[1], pReader->aBuf[0], aBufN[0]);
}
_exit:
return code;
}
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
STsdb* pTsdb = pReader->pTsdb;
while (true) {
if (pReader->pDataFReader == NULL) {
code = tsdbSnapReadOpenFile(pReader);
if (code) goto _err;
}
if (pReader->pDataFReader == NULL) break;
SRowInfo* pRowInfo = tsdbSnapGetRow(pReader);
if (pRowInfo == NULL) {
tsdbDataFReaderClose(&pReader->pDataFReader);
continue;
}
TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
SBlockData* pBlockData = &pReader->bData;
code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, id.suid, id.uid, &pReader->skmTable);
if (code) goto _err;
code = tBlockDataInit(pBlockData, id.suid, id.uid, pReader->skmTable.pTSchema);
if (code) goto _err;
while (pRowInfo->suid == id.suid && pRowInfo->uid == id.uid) {
code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL, pRowInfo->uid);
if (code) goto _err;
code = tsdbSnapNextRow(pReader);
if (code) goto _err;
pRowInfo = tsdbSnapGetRow(pReader);
if (pRowInfo == NULL) {
tsdbDataFReaderClose(&pReader->pDataFReader); tsdbDataFReaderClose(&pReader->pDataFReader);
break; break;
} }
if (pBlockData->nRow >= 4096) break;
} }
code = tsdbSnapCmprData(pReader, ppData);
if (code) goto _err;
break;
} }
_exit:
return code; return code;
_err: _err:
...@@ -216,7 +394,6 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { ...@@ -216,7 +394,6 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
size += tPutDelData(NULL, pDelData); size += tPutDelData(NULL, pDelData);
} }
} }
if (size == 0) continue; if (size == 0) continue;
// org data // org data
...@@ -292,23 +469,33 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type ...@@ -292,23 +469,33 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type
goto _err; goto _err;
} }
// data
pReader->fid = INT32_MIN; pReader->fid = INT32_MIN;
pReader->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) {
if (pReader->aBlockIdx == NULL) { SFDataIter* pIter = &pReader->aFDataIter[iIter];
if (iIter == 0) {
pIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pIter->aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
pReader->aSstBlk = taosArrayInit(0, sizeof(SSttBlk)); } else {
if (pReader->aSstBlk == NULL) { pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if (pIter->aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
pReader->mBlock = tMapDataInit(); }
code = tBlockDataCreate(&pReader->oBlockData);
code = tBlockDataCreate(&pIter->bData);
if (code) goto _err; if (code) goto _err;
code = tBlockDataCreate(&pReader->nBlockData); }
code = tBlockDataCreate(&pReader->bData);
if (code) goto _err; if (code) goto _err;
// del
pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx)); pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
if (pReader->aDelIdx == NULL) { if (pReader->aDelIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -335,18 +522,26 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { ...@@ -335,18 +522,26 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
int32_t code = 0; int32_t code = 0;
STsdbSnapReader* pReader = *ppReader; STsdbSnapReader* pReader = *ppReader;
if (pReader->pDataFReader) { // data
tsdbDataFReaderClose(&pReader->pDataFReader); if (pReader->pDataFReader) tsdbDataFReaderClose(&pReader->pDataFReader);
for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) {
SFDataIter* pIter = &pReader->aFDataIter[iIter];
if (iIter == 0) {
taosArrayDestroy(pIter->aBlockIdx);
tMapDataClear(&pIter->mBlock);
} else {
taosArrayDestroy(pIter->aSttBlk);
} }
taosArrayDestroy(pReader->aSstBlk);
taosArrayDestroy(pReader->aBlockIdx);
tMapDataClear(&pReader->mBlock);
tBlockDataDestroy(&pReader->oBlockData, 1);
tBlockDataDestroy(&pReader->nBlockData, 1);
if (pReader->pDelFReader) { tBlockDataDestroy(&pIter->bData, 1);
tsdbDelFReaderClose(&pReader->pDelFReader);
} }
tBlockDataDestroy(&pReader->bData, 1);
tTSchemaDestroy(pReader->skmTable.pTSchema);
// del
if (pReader->pDelFReader) tsdbDelFReaderClose(&pReader->pDelFReader);
taosArrayDestroy(pReader->aDelIdx); taosArrayDestroy(pReader->aDelIdx);
taosArrayDestroy(pReader->aDelData); taosArrayDestroy(pReader->aDelData);
...@@ -354,6 +549,10 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { ...@@ -354,6 +549,10 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
tsdbInfo("vgId:%d, vnode snapshot tsdb reader closed for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path); tsdbInfo("vgId:%d, vnode snapshot tsdb reader closed for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path);
for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(pReader->aBuf[0]); iBuf++) {
tFree(pReader->aBuf[iBuf]);
}
taosMemoryFree(pReader); taosMemoryFree(pReader);
*ppReader = NULL; *ppReader = NULL;
return code; return code;
...@@ -416,34 +615,31 @@ struct STsdbSnapWriter { ...@@ -416,34 +615,31 @@ struct STsdbSnapWriter {
int32_t maxRow; int32_t maxRow;
int8_t cmprAlg; int8_t cmprAlg;
int64_t commitID; int64_t commitID;
uint8_t* aBuf[5]; uint8_t* aBuf[5];
// for data file // for data file
SBlockData bData; SBlockData bData;
int32_t fid; int32_t fid;
SDataFReader* pDataFReader; TABLEID id;
SArray* aBlockIdx; // SArray<SBlockIdx> SSkmInfo skmTable;
struct {
SDataFReader* pReader;
SArray* aBlockIdx;
int32_t iBlockIdx; int32_t iBlockIdx;
SBlockIdx* pBlockIdx; SBlockIdx* pBlockIdx;
SMapData mBlock; // SMapData<SDataBlk> SMapData mDataBlk;
int32_t iBlock; int32_t iDataBlk;
SBlockData* pBlockData; SBlockData bData;
int32_t iRow; int32_t iRow;
SBlockData bDataR; } dReader;
SArray* aSstBlk; // SArray<SSttBlk> struct {
int32_t iBlockL; SDataFWriter* pWriter;
SBlockData lDataR; SArray* aBlockIdx;
SMapData mDataBlk;
SDataFWriter* pDataFWriter; SArray* aSttBlk;
SBlockIdx* pBlockIdxW; // NULL when no committing table SBlockData bData;
SDataBlk blockW; SBlockData sData;
SBlockData bDataW; } dWriter;
SBlockIdx blockIdxW;
SMapData mBlockW; // SMapData<SDataBlk>
SArray* aBlockIdxW; // SArray<SBlockIdx>
SArray* aBlockLW; // SArray<SSttBlk>
// for del file // for del file
SDelFReader* pDelFReader; SDelFReader* pDelFReader;
...@@ -454,520 +650,447 @@ struct STsdbSnapWriter { ...@@ -454,520 +650,447 @@ struct STsdbSnapWriter {
SArray* aDelIdxW; SArray* aDelIdxW;
}; };
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { // SNAP_DATA_TSDB
extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg);
extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg);
static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) {
int32_t code = 0; int32_t code = 0;
ASSERT(pWriter->pDataFWriter); ASSERT(pWriter->dReader.iRow >= pWriter->dReader.bData.nRow);
if (pWriter->pBlockIdxW == NULL) goto _exit; if (pWriter->dReader.iBlockIdx < taosArrayGetSize(pWriter->dReader.aBlockIdx)) {
pWriter->dReader.pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->dReader.aBlockIdx, pWriter->dReader.iBlockIdx);
// consume remain rows code = tsdbReadDataBlk(pWriter->dReader.pReader, pWriter->dReader.pBlockIdx, &pWriter->dReader.mDataBlk);
if (pWriter->pBlockData) { if (code) goto _exit;
ASSERT(pWriter->iRow < pWriter->pBlockData->nRow);
while (pWriter->iRow < pWriter->pBlockData->nRow) {
code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL,
0); // todo
if (code) goto _err;
if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) { pWriter->dReader.iBlockIdx++;
// pWriter->blockW.last = 0; } else {
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, pWriter->dReader.pBlockIdx = NULL;
// &pWriter->blockW, pWriter->cmprAlg); tMapDataReset(&pWriter->dReader.mDataBlk);
if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk);
if (code) goto _err;
tDataBlkReset(&pWriter->blockW);
tBlockDataClear(&pWriter->bDataW);
} }
pWriter->dReader.iDataBlk = 0; // point to the next one
tBlockDataReset(&pWriter->dReader.bData);
pWriter->dReader.iRow = 0;
pWriter->iRow++; _exit:
} return code;
} }
// write remain data if has static int32_t tsdbSnapWriteCopyData(STsdbSnapWriter* pWriter, TABLEID* pId) {
if (pWriter->bDataW.nRow > 0) { int32_t code = 0;
// pWriter->blockW.last = 0;
if (pWriter->bDataW.nRow < pWriter->minRow) {
if (pWriter->iBlock > pWriter->mBlock.nItem) {
// pWriter->blockW.last = 1;
}
}
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, while (true) {
// &pWriter->blockW, pWriter->cmprAlg); if (pWriter->dReader.pBlockIdx == NULL) break;
// if (code) goto _err; if (tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, pId) >= 0) break;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); SBlockIdx blkIdx = *pWriter->dReader.pBlockIdx;
if (code) goto _err; code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dReader.mDataBlk, &blkIdx);
} if (code) goto _exit;
while (true) { if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blkIdx) == NULL) {
if (pWriter->iBlock >= pWriter->mBlock.nItem) break; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
SDataBlk block; code = tsdbSnapNextTableData(pWriter);
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetDataBlk); if (code) goto _exit;
}
// if (block.last) { _exit:
// code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); return code;
// if (code) goto _err; }
// tBlockReset(&block); static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) {
// block.last = 1; int32_t code = 0;
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pWriter->pBlockIdxW, &block,
// pWriter->cmprAlg);
// if (code) goto _err;
// }
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk); code = tsdbSnapWriteCopyData(pWriter, pId);
if (code) goto _err; if (code) goto _err;
pWriter->iBlock++; pWriter->id.suid = pId->suid;
} pWriter->id.uid = pId->uid;
// SDataBlk code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable);
// code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW); if (code) goto _err;
// if (code) goto _err;
// SBlockIdx tMapDataReset(&pWriter->dWriter.mDataBlk);
if (taosArrayPush(pWriter->aBlockIdxW, pWriter->pBlockIdxW) == NULL) { code = tBlockDataInit(&pWriter->dWriter.bData, pId->suid, pId->uid, pWriter->skmTable.pTSchema);
code = TSDB_CODE_OUT_OF_MEMORY; if (code) goto _err;
goto _err;
}
_exit:
tsdbInfo("vgId:%d, tsdb snapshot write table data end for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d, tsdb snapshot write table data end for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* pBlockIdx) { static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
int32_t code = 0; int32_t code = 0;
code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock); if (pWriter->id.suid == 0 && pWriter->id.uid == 0) return code;
int32_t c = 1;
if (pWriter->dReader.pBlockIdx) {
c = tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &pWriter->id);
ASSERT(c >= 0);
}
if (c == 0) {
SBlockData* pBData = &pWriter->dWriter.bData;
for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) {
TSDBROW row = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow);
code = tBlockDataAppendRow(pBData, &row, NULL, pWriter->id.uid);
if (code) goto _err; if (code) goto _err;
// SBlockData if (pBData->nRow >= pWriter->maxRow) {
SDataBlk block; code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg);
tMapDataReset(&pWriter->mBlockW); if (code) goto _err;
for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) { }
tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetDataBlk); }
// if (block.last) { code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg);
// code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); if (code) goto _err;
// if (code) goto _err;
// tBlockReset(&block); for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) {
// block.last = 1; SDataBlk dataBlk;
// code = tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk);
// tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block,
// pWriter->cmprAlg);
// if (code) goto _err;
// }
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk); code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk);
if (code) goto _err; if (code) goto _err;
} }
// SDataBlk code = tsdbSnapNextTableData(pWriter);
SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, &blockIdx);
if (code) goto _err; if (code) goto _err;
}
// SBlockIdx if (pWriter->dWriter.mDataBlk.nItem) {
if (taosArrayPush(pWriter->aBlockIdxW, &blockIdx) == NULL) { SBlockIdx blockIdx = {.suid = pWriter->id.suid, .uid = pWriter->id.uid};
code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dWriter.mDataBlk, &blockIdx);
if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
}
pWriter->id.suid = 0;
pWriter->id.uid = 0;
_exit:
return code; return code;
_err: _err:
tsdbError("vgId:%d, tsdb snapshot move write table data for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) {
int32_t code = 0; int32_t code = 0;
SBlockData* pBlockData = &pWriter->bData; STsdb* pTsdb = pWriter->pTsdb;
int32_t iRow = 0;
TSDBROW row;
TSDBROW* pRow = &row;
// // correct schema
// code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData);
// if (code) goto _err;
// loop to merge
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
while (true) {
if (pRow == NULL) break;
if (pWriter->pBlockData) {
ASSERT(pWriter->iRow < pWriter->pBlockData->nRow);
int32_t c = tsdbRowCmprFn(pRow, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow)); ASSERT(pWriter->dWriter.pWriter == NULL);
ASSERT(c); pWriter->fid = fid;
pWriter->id = (TABLEID){0};
SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ);
if (c < 0) { // Reader
// code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); if (pSet) {
// if (code) goto _err; code = tsdbDataFReaderOpen(&pWriter->dReader.pReader, pWriter->pTsdb, pSet);
if (code) goto _err;
iRow++; code = tsdbReadBlockIdx(pWriter->dReader.pReader, pWriter->dReader.aBlockIdx);
if (iRow < pWriter->pBlockData->nRow) { if (code) goto _err;
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
} else { } else {
pRow = NULL; ASSERT(pWriter->dReader.pReader == NULL);
taosArrayClear(pWriter->dReader.aBlockIdx);
} }
} else if (c > 0) { pWriter->dReader.iBlockIdx = 0; // point to the next one
// code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), code = tsdbSnapNextTableData(pWriter);
// NULL); if (code) goto _err; if (code) goto _err;
pWriter->iRow++; // Writer
if (pWriter->iRow >= pWriter->pBlockData->nRow) { SHeadFile fHead = {.commitID = pWriter->commitID};
pWriter->pBlockData = NULL; SDataFile fData = {.commitID = pWriter->commitID};
} SSmaFile fSma = {.commitID = pWriter->commitID};
SSttFile fStt = {.commitID = pWriter->commitID};
SDFileSet wSet = {.fid = pWriter->fid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
if (pSet) {
wSet.diskId = pSet->diskId;
fData = *pSet->pDataF;
fSma = *pSet->pSmaF;
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
wSet.aSttF[iStt] = pSet->aSttF[iStt];
} }
wSet.nSttF = pSet->nSttF + 1; // TODO: fix pSet->nSttF == pTsdb->maxFile
} else { } else {
TSDBKEY key = TSDBROW_KEY(pRow); SDiskID did = {0};
tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
while (true) { tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
if (pWriter->iBlock >= pWriter->mBlock.nItem) break; wSet.diskId = did;
wSet.nSttF = 1;
SDataBlk block; }
int32_t c; wSet.aSttF[wSet.nSttF - 1] = &fStt;
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetDataBlk);
// if (block.last) {
// pWriter->pBlockData = &pWriter->bDataR;
// code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, code = tsdbDataFWriterOpen(&pWriter->dWriter.pWriter, pWriter->pTsdb, &wSet);
// NULL); if (code) goto _err; pWriter->iRow = 0; if (code) goto _err;
taosArrayClear(pWriter->dWriter.aBlockIdx);
tMapDataReset(&pWriter->dWriter.mDataBlk);
taosArrayClear(pWriter->dWriter.aSttBlk);
tBlockDataReset(&pWriter->dWriter.bData);
tBlockDataReset(&pWriter->dWriter.sData);
// pWriter->iBlock++; return code;
// break;
// }
c = tsdbKeyCmprFn(&block.maxKey, &key); _err:
return code;
}
ASSERT(c); static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter) {
int32_t code = 0;
if (c < 0) { ASSERT(pWriter->dWriter.pWriter);
if (pWriter->bDataW.nRow) {
// pWriter->blockW.last = 0;
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
// &pWriter->blockW, pWriter->cmprAlg);
// if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err; if (code) goto _err;
tDataBlkReset(&pWriter->blockW); // copy remain table data
tBlockDataClear(&pWriter->bDataW); TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX};
} code = tsdbSnapWriteCopyData(pWriter, &id);
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk);
if (code) goto _err; if (code) goto _err;
pWriter->iBlock++; code =
} else { tsdbWriteSttBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.sData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg);
c = tsdbKeyCmprFn(&tBlockDataLastKey(pBlockData), &block.minKey); if (code) goto _err;
ASSERT(c);
if (c > 0) {
pWriter->pBlockData = &pWriter->bDataR;
// code =
// tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL,
// NULL);
// if (code) goto _err;
pWriter->iRow = 0;
pWriter->iBlock++;
}
break;
}
}
if (pWriter->pBlockData) continue;
// code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); // Indices
// if (code) goto _err; code = tsdbWriteBlockIdx(pWriter->dWriter.pWriter, pWriter->dWriter.aBlockIdx);
if (code) goto _err;
iRow++; code = tsdbWriteSttBlk(pWriter->dWriter.pWriter, pWriter->dWriter.aSttBlk);
if (iRow < pBlockData->nRow) { if (code) goto _err;
*pRow = tsdbRowFromBlockData(pBlockData, iRow);
} else {
pRow = NULL;
}
}
_check_write: code = tsdbUpdateDFileSetHeader(pWriter->dWriter.pWriter);
if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue; if (code) goto _err;
_write_block: code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->dWriter.pWriter->wSet);
// code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, if (code) goto _err;
// &pWriter->blockW, pWriter->cmprAlg);
// if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); code = tsdbDataFWriterClose(&pWriter->dWriter.pWriter, 1);
if (code) goto _err; if (code) goto _err;
tDataBlkReset(&pWriter->blockW); if (pWriter->dReader.pReader) {
tBlockDataClear(&pWriter->bDataW); code = tsdbDataFReaderClose(&pWriter->dReader.pReader);
if (code) goto _err;
} }
_exit:
return code; return code;
_err: _err:
tsdbError("vgId:%d, vnode snapshot tsdb write table data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) { static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, int8_t* done) {
int32_t code = 0; int32_t code = 0;
SBlockData* pBlockData = &pWriter->bData;
TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData);
TSDBKEY keyLast = tBlockDataLastKey(pBlockData);
// end last table write if should SBlockData* pBData = &pWriter->bData;
if (pWriter->pBlockIdxW) { TABLEID id = {.suid = pBData->suid, .uid = pBData->uid ? pBData->uid : pBData->aUid[iRow]};
int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id); TSDBROW row = tsdbRowFromBlockData(pBData, iRow);
TSDBKEY key = TSDBROW_KEY(&row);
*done = 0;
while (pWriter->dReader.iRow < pWriter->dReader.bData.nRow ||
pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem) {
// Merge row by row
for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) {
TSDBROW trow = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow);
TSDBKEY tKey = TSDBROW_KEY(&trow);
ASSERT(pWriter->dReader.bData.suid == id.suid && pWriter->dReader.bData.uid == id.uid);
int32_t c = tsdbKeyCmprFn(&key, &tKey);
if (c < 0) { if (c < 0) {
// end code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid);
code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err; if (code) goto _err;
// reset
pWriter->pBlockIdxW = NULL;
} else if (c > 0) { } else if (c > 0) {
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &trow, NULL, id.uid);
if (code) goto _err;
} else {
ASSERT(0); ASSERT(0);
} }
if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) {
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
pWriter->cmprAlg);
if (code) goto _err;
} }
// start new table data write if need if (c < 0) {
if (pWriter->pBlockIdxW == NULL) { *done = 1;
// write table data ahead goto _exit;
while (true) { }
if (pWriter->iBlockIdx >= taosArrayGetSize(pWriter->aBlockIdx)) break; }
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); // Merge row by block
int32_t c = tTABLEIDCmprFn(pBlockIdx, &id); SDataBlk tDataBlk = {.minKey = key, .maxKey = key};
for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) {
SDataBlk dataBlk;
tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk);
if (c >= 0) break; int32_t c = tDataBlkCmprFn(&dataBlk, &tDataBlk);
if (c < 0) {
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
pWriter->cmprAlg);
if (code) goto _err;
code = tsdbSnapMoveWriteTableData(pWriter, pBlockIdx); code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk);
if (code) goto _err;
} else if (c > 0) {
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid);
if (code) goto _err; if (code) goto _err;
pWriter->iBlockIdx++; if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) {
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
pWriter->cmprAlg);
if (code) goto _err;
} }
// reader *done = 1;
pWriter->pBlockIdx = NULL; goto _exit;
if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { } else {
ASSERT(pWriter->pDataFReader); code = tsdbReadDataBlockEx(pWriter->dReader.pReader, &dataBlk, &pWriter->dReader.bData);
if (code) goto _err;
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); pWriter->dReader.iRow = 0;
int32_t c = tTABLEIDCmprFn(pBlockIdx, &id);
ASSERT(c >= 0);
if (c == 0) { pWriter->dReader.iDataBlk++;
pWriter->pBlockIdx = pBlockIdx; break;
pWriter->iBlockIdx++; }
} }
} }
if (pWriter->pBlockIdx) { _exit:
code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock); return code;
_err:
tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
return code;
}
static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) {
int32_t code = 0;
TABLEID id = {.suid = pWriter->bData.suid,
.uid = pWriter->bData.uid ? pWriter->bData.uid : pWriter->bData.aUid[iRow]};
TSDBROW row = tsdbRowFromBlockData(&pWriter->bData, iRow);
SBlockData* pBData = &pWriter->dWriter.sData;
if (pBData->suid || pBData->uid) {
if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) {
code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, pBData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg);
if (code) goto _err; if (code) goto _err;
} else {
tMapDataReset(&pWriter->mBlock); pBData->suid = 0;
pBData->uid = 0;
}
} }
pWriter->iBlock = 0;
pWriter->pBlockData = NULL;
pWriter->iRow = 0;
// writer if (pBData->suid == 0 && pBData->uid == 0) {
pWriter->pBlockIdxW = &pWriter->blockIdxW; code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pWriter->id.suid, pWriter->id.uid, &pWriter->skmTable);
pWriter->pBlockIdxW->suid = id.suid; if (code) goto _err;
pWriter->pBlockIdxW->uid = id.uid;
tDataBlkReset(&pWriter->blockW); code = tBlockDataInit(pBData, pWriter->id.suid, pWriter->id.suid ? 0 : pWriter->id.uid, pWriter->skmTable.pTSchema);
tBlockDataReset(&pWriter->bDataW); if (code) goto _err;
tMapDataReset(&pWriter->mBlockW);
} }
ASSERT(pWriter->pBlockIdxW && pWriter->pBlockIdxW->suid == id.suid && pWriter->pBlockIdxW->uid == id.uid); code = tBlockDataAppendRow(pBData, &row, NULL, id.uid);
ASSERT(pWriter->pBlockIdx == NULL || (pWriter->pBlockIdx->suid == id.suid && pWriter->pBlockIdx->uid == id.uid)); if (code) goto _err;
code = tsdbSnapWriteTableDataImpl(pWriter); if (pBData->nRow >= pWriter->maxRow) {
code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, pBData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg);
if (code) goto _err; if (code) goto _err;
}
_exit: _exit:
tsdbDebug("vgId:%d, vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode),
pWriter->pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d, vnode snapshot tsdb write data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
pWriter->pTsdb->path, tstrerror(code));
return code; return code;
} }
static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb;
if (pWriter->pDataFWriter == NULL) goto _exit; SBlockData* pBlockData = &pWriter->bData;
TABLEID id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]};
// finish current table // End last table data write if need
if (tTABLEIDCmprFn(&pWriter->id, &id) != 0) {
code = tsdbSnapWriteTableDataEnd(pWriter); code = tsdbSnapWriteTableDataEnd(pWriter);
if (code) goto _err; if (code) goto _err;
// move remain table
while (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
code = tsdbSnapMoveWriteTableData(pWriter, (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx));
if (code) goto _err;
pWriter->iBlockIdx++;
} }
// write remain stuff // Start new table data write if need
if (taosArrayGetSize(pWriter->aBlockLW) > 0) { if (pWriter->id.suid == 0 && pWriter->id.uid == 0) {
code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aBlockIdxW); code = tsdbSnapWriteTableDataStart(pWriter, &id);
if (code) goto _err; if (code) goto _err;
} }
if (taosArrayGetSize(pWriter->aBlockIdx) > 0) { // Merge with .data file data
code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW); int8_t done = 0;
if (pWriter->dReader.pBlockIdx && tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &id) == 0) {
code = tsdbSnapWriteToDataFile(pWriter, iRow, &done);
if (code) goto _err; if (code) goto _err;
} }
code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet); // Append to the .stt data block (todo: check if need to set/reload sst block)
if (code) goto _err; if (!done) {
code = tsdbSnapWriteToSttFile(pWriter, iRow);
code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1);
if (code) goto _err;
if (pWriter->pDataFReader) {
code = tsdbDataFReaderClose(&pWriter->pDataFReader);
if (code) goto _err; if (code) goto _err;
} }
_exit: _exit:
tsdbInfo("vgId:%d, vnode snapshot tsdb writer data end for %s", TD_VID(pTsdb->pVnode), pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d, vnode snapshot tsdb writer data end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
tstrerror(code));
return code; return code;
} }
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb; STsdb* pTsdb = pWriter->pTsdb;
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr));
int64_t n;
// decode
SBlockData* pBlockData = &pWriter->bData; SBlockData* pBlockData = &pWriter->bData;
code = tDecmprBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pHdr->size - sizeof(TABLEID), pBlockData,
pWriter->aBuf);
if (code) goto _err;
// open file // Decode data
TSDBKEY keyFirst = {.version = pBlockData->aVersion[0], .ts = pBlockData->aTSKEY[0]}; SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
TSDBKEY keyLast = {.version = pBlockData->aVersion[pBlockData->nRow - 1], code = tDecmprBlockData(pHdr->data, pHdr->size, pBlockData, pWriter->aBuf);
.ts = pBlockData->aTSKEY[pBlockData->nRow - 1]};
int32_t fid = tsdbKeyFid(keyFirst.ts, pWriter->minutes, pWriter->precision);
ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision));
if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) {
// end last file data write if need
code = tsdbSnapWriteDataEnd(pWriter);
if (code) goto _err; if (code) goto _err;
pWriter->fid = fid; ASSERT(pBlockData->nRow > 0);
// read // Loop to handle each row
SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
if (pSet) { TSKEY ts = pBlockData->aTSKEY[iRow];
code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet); int32_t fid = tsdbKeyFid(ts, pWriter->minutes, pWriter->precision);
if (code) goto _err;
code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx); if (pWriter->dWriter.pWriter == NULL || pWriter->fid != fid) {
if (code) goto _err; if (pWriter->dWriter.pWriter) {
ASSERT(fid > pWriter->fid);
code = tsdbReadSttBlk(pWriter->pDataFReader, 0, pWriter->aSstBlk); code = tsdbSnapWriteCloseFile(pWriter);
if (code) goto _err; if (code) goto _err;
} else {
ASSERT(pWriter->pDataFReader == NULL);
taosArrayClear(pWriter->aBlockIdx);
taosArrayClear(pWriter->aSstBlk);
}
pWriter->iBlockIdx = 0;
pWriter->pBlockIdx = NULL;
tMapDataReset(&pWriter->mBlock);
pWriter->iBlock = 0;
pWriter->pBlockData = NULL;
pWriter->iRow = 0;
pWriter->iBlockL = 0;
tBlockDataReset(&pWriter->bDataR);
tBlockDataReset(&pWriter->lDataR);
// write
SHeadFile fHead;
SDataFile fData;
SSttFile fLast;
SSmaFile fSma;
SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .aSttF[0] = &fLast, .pSmaF = &fSma};
if (pSet) {
wSet.diskId = pSet->diskId;
wSet.fid = fid;
wSet.nSttF = 1;
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
fData = *pSet->pDataF;
fLast = (SSttFile){.commitID = pWriter->commitID, .size = 0};
fSma = *pSet->pSmaF;
} else {
wSet.diskId = (SDiskID){.level = 0, .id = 0};
wSet.fid = fid;
wSet.nSttF = 1;
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
fData = (SDataFile){.commitID = pWriter->commitID, .size = 0};
fLast = (SSttFile){.commitID = pWriter->commitID, .size = 0, .offset = 0};
fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0};
} }
code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet); code = tsdbSnapWriteOpenFile(pWriter, fid);
if (code) goto _err; if (code) goto _err;
taosArrayClear(pWriter->aBlockIdxW);
taosArrayClear(pWriter->aBlockLW);
tMapDataReset(&pWriter->mBlockW);
pWriter->pBlockIdxW = NULL;
tBlockDataReset(&pWriter->bDataW);
} }
code = tsdbSnapWriteTableData(pWriter, id); code = tsdbSnapWriteRowData(pWriter, iRow);
if (code) goto _err; if (code) goto _err;
}
tsdbInfo("vgId:%d, vnode snapshot tsdb write data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d",
TD_VID(pTsdb->pVnode), pTsdb->path, fid, id.suid, id.suid, pBlockData->nRow);
return code; return code;
_err: _err:
...@@ -976,10 +1099,41 @@ _err: ...@@ -976,10 +1099,41 @@ _err:
return code; return code;
} }
// SNAP_DATA_DEL
static int32_t tsdbSnapMoveWriteDelData(STsdbSnapWriter* pWriter, TABLEID* pId) {
int32_t code = 0;
while (true) {
if (pWriter->iDelIdx >= taosArrayGetSize(pWriter->aDelIdxR)) break;
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
if (tTABLEIDCmprFn(pDelIdx, pId) >= 0) break;
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
if (code) goto _exit;
SDelIdx delIdx = *pDelIdx;
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
if (code) goto _exit;
if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pWriter->iDelIdx++;
}
_exit:
return code;
}
static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb; STsdb* pTsdb = pWriter->pTsdb;
// Open del file if not opened yet
if (pWriter->pDelFWriter == NULL) { if (pWriter->pDelFWriter == NULL) {
SDelFile* pDelFile = pWriter->fs.pDelFile; SDelFile* pDelFile = pWriter->fs.pDelFile;
...@@ -990,38 +1144,28 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 ...@@ -990,38 +1144,28 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR); code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR);
if (code) goto _err; if (code) goto _err;
} else {
taosArrayClear(pWriter->aDelIdxR);
} }
pWriter->iDelIdx = 0;
// writer // writer
SDelFile delFile = {.commitID = pWriter->commitID, .offset = 0, .size = 0}; SDelFile delFile = {.commitID = pWriter->commitID};
code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb); code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb);
if (code) goto _err; if (code) goto _err;
taosArrayClear(pWriter->aDelIdxW);
} }
// process the del data SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr)); TABLEID id = *(TABLEID*)pHdr->data;
while (true) {
if (pWriter->iDelIdx >= taosArrayGetSize(pWriter->aDelIdxR)) break;
if (tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) >= 0) break;
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData); ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData);
if (code) goto _err;
SDelIdx delIdx = *pDelIdx; // Move write data < id
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx); code = tsdbSnapMoveWriteDelData(pWriter, &id);
if (code) goto _err; if (code) goto _err;
if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) { // Merge incoming data with current
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->iDelIdx++;
}
if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR) && if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR) &&
tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) == 0) { tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) == 0) {
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx); SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
...@@ -1055,7 +1199,6 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 ...@@ -1055,7 +1199,6 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
goto _err; goto _err;
} }
_exit:
return code; return code;
_err: _err:
...@@ -1068,24 +1211,15 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { ...@@ -1068,24 +1211,15 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb; STsdb* pTsdb = pWriter->pTsdb;
if (pWriter->pDelFWriter == NULL) goto _exit; if (pWriter->pDelFWriter == NULL) return code;
for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR); pWriter->iDelIdx++) {
SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData); TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX};
code = tsdbSnapMoveWriteDelData(pWriter, &id);
if (code) goto _err; if (code) goto _err;
SDelIdx delIdx = *pDelIdx; code = tsdbWriteDelIdx(pWriter->pDelFWriter, pWriter->aDelIdxW);
code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
if (code) goto _err; if (code) goto _err;
if (taosArrayPush(pWriter->aDelIdxR, &delIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter); code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
if (code) goto _err; if (code) goto _err;
...@@ -1100,7 +1234,6 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { ...@@ -1100,7 +1234,6 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
if (code) goto _err; if (code) goto _err;
} }
_exit:
tsdbInfo("vgId:%d, vnode snapshot tsdb write del for %s end", TD_VID(pTsdb->pVnode), pTsdb->path); tsdbInfo("vgId:%d, vnode snapshot tsdb write del for %s end", TD_VID(pTsdb->pVnode), pTsdb->path);
return code; return code;
...@@ -1110,6 +1243,7 @@ _err: ...@@ -1110,6 +1243,7 @@ _err:
return code; return code;
} }
// APIs
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) { int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) {
int32_t code = 0; int32_t code = 0;
STsdbSnapWriter* pWriter = NULL; STsdbSnapWriter* pWriter = NULL;
...@@ -1135,39 +1269,38 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -1135,39 +1269,38 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
pWriter->commitID = pTsdb->pVnode->state.commitID; pWriter->commitID = pTsdb->pVnode->state.commitID;
// for data file // SNAP_DATA_TSDB
code = tBlockDataCreate(&pWriter->bData); code = tBlockDataCreate(&pWriter->bData);
if (code) goto _err; if (code) goto _err;
pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pWriter->aBlockIdx == NULL) { pWriter->fid = INT32_MIN;
pWriter->id = (TABLEID){0};
// Reader
pWriter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pWriter->dReader.aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
code = tBlockDataCreate(&pWriter->bDataR); code = tBlockDataCreate(&pWriter->dReader.bData);
if (code) goto _err; if (code) goto _err;
pWriter->aSstBlk = taosArrayInit(0, sizeof(SSttBlk)); // Writer
if (pWriter->aSstBlk == NULL) { pWriter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pWriter->dWriter.aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
pWriter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
pWriter->aBlockIdxW = taosArrayInit(0, sizeof(SBlockIdx)); if (pWriter->dWriter.aSttBlk == NULL) {
if (pWriter->aBlockIdxW == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
code = tBlockDataCreate(&pWriter->bDataW); code = tBlockDataCreate(&pWriter->dWriter.bData);
if (code) goto _err;
code = tBlockDataCreate(&pWriter->dWriter.sData);
if (code) goto _err; if (code) goto _err;
pWriter->aBlockLW = taosArrayInit(0, sizeof(SSttBlk)); // SNAP_DATA_DEL
if (pWriter->aBlockLW == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
// for del file
pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx)); pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx));
if (pWriter->aDelIdxR == NULL) { if (pWriter->aDelIdxR == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -1188,6 +1321,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -1188,6 +1321,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
tsdbInfo("vgId:%d, tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path); tsdbInfo("vgId:%d, tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d, tsdb snapshot writer open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, tsdbError("vgId:%d, tsdb snapshot writer open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code)); tstrerror(code));
...@@ -1198,14 +1332,17 @@ _err: ...@@ -1198,14 +1332,17 @@ _err:
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0; int32_t code = 0;
STsdbSnapWriter* pWriter = *ppWriter; STsdbSnapWriter* pWriter = *ppWriter;
STsdb* pTsdb = pWriter->pTsdb;
if (rollback) { if (rollback) {
ASSERT(0); ASSERT(0);
// code = tsdbFSRollback(pWriter->pTsdb->pFS); // code = tsdbFSRollback(pWriter->pTsdb->pFS);
// if (code) goto _err; // if (code) goto _err;
} else { } else {
code = tsdbSnapWriteDataEnd(pWriter); if (pWriter->dWriter.pWriter) {
code = tsdbSnapWriteCloseFile(pWriter);
if (code) goto _err; if (code) goto _err;
}
code = tsdbSnapWriteDelEnd(pWriter); code = tsdbSnapWriteDelEnd(pWriter);
if (code) goto _err; if (code) goto _err;
...@@ -1213,14 +1350,44 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { ...@@ -1213,14 +1350,44 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
code = tsdbFSCommit1(pWriter->pTsdb, &pWriter->fs); code = tsdbFSCommit1(pWriter->pTsdb, &pWriter->fs);
if (code) goto _err; if (code) goto _err;
// lock
taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbFSCommit2(pWriter->pTsdb, &pWriter->fs); code = tsdbFSCommit2(pWriter->pTsdb, &pWriter->fs);
if (code) goto _err; if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _err;
} }
// unlock
taosThreadRwlockUnlock(&pTsdb->rwLock);
}
// SNAP_DATA_DEL
taosArrayDestroy(pWriter->aDelIdxW);
taosArrayDestroy(pWriter->aDelData);
taosArrayDestroy(pWriter->aDelIdxR);
// SNAP_DATA_TSDB
// Writer
tBlockDataDestroy(&pWriter->dWriter.sData, 1);
tBlockDataDestroy(&pWriter->dWriter.bData, 1);
taosArrayDestroy(pWriter->dWriter.aSttBlk);
tMapDataClear(&pWriter->dWriter.mDataBlk);
taosArrayDestroy(pWriter->dWriter.aBlockIdx);
// Reader
tBlockDataDestroy(&pWriter->dReader.bData, 1);
tMapDataClear(&pWriter->dReader.mDataBlk);
taosArrayDestroy(pWriter->dReader.aBlockIdx);
tBlockDataDestroy(&pWriter->bData, 1);
tTSchemaDestroy(pWriter->skmTable.pTSchema);
for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) { for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) {
tFree(pWriter->aBuf[iBuf]); tFree(pWriter->aBuf[iBuf]);
} }
tsdbInfo("vgId:%d, vnode snapshot tsdb writer close for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); tsdbInfo("vgId:%d, vnode snapshot tsdb writer close for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
*ppWriter = NULL; *ppWriter = NULL;
...@@ -1245,8 +1412,8 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) ...@@ -1245,8 +1412,8 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
goto _exit; goto _exit;
} else { } else {
if (pWriter->pDataFWriter) { if (pWriter->dWriter.pWriter) {
code = tsdbSnapWriteDataEnd(pWriter); code = tsdbSnapWriteCloseFile(pWriter);
if (code) goto _err; if (code) goto _err;
} }
} }
...@@ -1259,7 +1426,6 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) ...@@ -1259,7 +1426,6 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
_exit: _exit:
tsdbDebug("vgId:%d, tsdb snapshot write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); tsdbDebug("vgId:%d, tsdb snapshot write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
return code; return code;
_err: _err:
......
...@@ -354,7 +354,8 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { ...@@ -354,7 +354,8 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData); code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData);
if (code) goto _err; if (code) goto _err;
} break; } break;
case SNAP_DATA_TSDB: { case SNAP_DATA_TSDB:
case SNAP_DATA_DEL: {
// tsdb // tsdb
if (pWriter->pTsdbSnapWriter == NULL) { if (pWriter->pTsdbSnapWriter == NULL) {
code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter); code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册