提交 2d789f5a 编写于 作者: H Hongze Cheng

more code

上级 36d01e84
...@@ -644,19 +644,19 @@ typedef struct { ...@@ -644,19 +644,19 @@ typedef struct {
} SRowInfo; } SRowInfo;
typedef struct SSttBlockLoadInfo { typedef struct SSttBlockLoadInfo {
SBlockData blockData[2]; SBlockData blockData[2];
SArray *aSttBlk; SArray *aSttBlk;
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
int32_t currentLoadBlockIndex; int32_t currentLoadBlockIndex;
} SSttBlockLoadInfo; } SSttBlockLoadInfo;
typedef struct SMergeTree { typedef struct SMergeTree {
int8_t backward; int8_t backward;
SRBTree rbt; SRBTree rbt;
SArray *pIterList; SArray *pIterList;
SLDataIter *pIter; SLDataIter *pIter;
bool destroyLoadInfo; bool destroyLoadInfo;
SSttBlockLoadInfo* pLoadInfo; SSttBlockLoadInfo *pLoadInfo;
} SMergeTree; } SMergeTree;
typedef struct { typedef struct {
...@@ -666,15 +666,15 @@ typedef struct { ...@@ -666,15 +666,15 @@ typedef struct {
} SSkmInfo; } SSkmInfo;
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pLoadInfo); STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree); bool tMergeTreeNext(SMergeTree *pMTree);
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree); TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo* tCreateLastBlockLoadInfo(); SSttBlockLoadInfo *tCreateLastBlockLoadInfo();
void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo); void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo); void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
// ========== inline functions ========== // ========== inline functions ==========
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
......
...@@ -609,40 +609,37 @@ struct STsdbSnapWriter { ...@@ -609,40 +609,37 @@ struct STsdbSnapWriter {
STsdbFS fs; STsdbFS fs;
// config // config
int32_t minutes; int32_t minutes;
int8_t precision; int8_t precision;
int32_t minRow; int32_t minRow;
int32_t maxRow; int32_t maxRow;
int8_t cmprAlg; int8_t cmprAlg;
int64_t commitID; int64_t commitID;
uint8_t* aBuf[5]; uint8_t* aBuf[5];
// for data file // for data file
SBlockData bData; SBlockData bData;
int32_t fid; int32_t fid;
SDataFReader* pDataFReader; TABLEID id;
SArray* aBlockIdx; // SArray<SBlockIdx> struct {
int32_t iBlockIdx; SDataFReader* pReader;
SBlockIdx* pBlockIdx; SArray* aBlockIdx;
SMapData mBlock; // SMapData<SDataBlk> int32_t iBlockIdx;
int32_t iBlock; SBlockIdx* pBlockIdx;
SBlockData* pBlockData; SMapData mDataBlk;
int32_t iRow; int32_t iDataBlk;
SBlockData bDataR; SBlockData bData;
SArray* aSstBlk; // SArray<SSttBlk> int32_t iRow;
int32_t iBlockL; } dReader;
SBlockData lDataR; struct {
SDataFWriter* pWriter;
SDataFWriter* pDataFWriter; SArray* aBlockIdx;
SBlockIdx* pBlockIdxW; // NULL when no committing table SMapData mDataBlk;
SDataBlk blockW; SArray* aSttBlk;
SBlockData bDataW; SBlockData bData;
SBlockIdx blockIdxW; SBlockData sData;
} dWriter;
SMapData mBlockW; // SMapData<SDataBlk> SSkmInfo skmTable;
SArray* aBlockIdxW; // SArray<SBlockIdx>
SArray* aBlockLW; // SArray<SSttBlk>
// for del file // for del file
SDelFReader* pDelFReader; SDelFReader* pDelFReader;
...@@ -653,6 +650,8 @@ struct STsdbSnapWriter { ...@@ -653,6 +650,8 @@ struct STsdbSnapWriter {
SArray* aDelIdxW; SArray* aDelIdxW;
}; };
// SNAP_DATA_TSDB
#if 0
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
int32_t code = 0; int32_t code = 0;
...@@ -1073,100 +1072,247 @@ _err: ...@@ -1073,100 +1072,247 @@ _err:
tstrerror(code)); tstrerror(code));
return code; return code;
} }
#endif
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter);
int32_t code = 0; static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) {
STsdb* pTsdb = pWriter->pTsdb; int32_t code = 0;
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; STsdb* pTsdb = pWriter->pTsdb;
TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr));
int64_t n;
// decode // close last file if need
SBlockData* pBlockData = &pWriter->bData; if (pWriter->dWriter.pWriter) {
code = tDecmprBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pHdr->size - sizeof(TABLEID), pBlockData, ASSERT(fid > pWriter->fid);
pWriter->aBuf); code = tsdbSnapWriteCloseFile(pWriter);
if (code) goto _err;
}
ASSERT(pWriter->dWriter.pWriter == NULL);
// open new
pWriter->fid = fid;
SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ);
// open reader
if (pSet) {
code = tsdbDataFReaderOpen(&pWriter->dReader.pReader, pWriter->pTsdb, pSet);
if (code) goto _err;
code = tsdbReadBlockIdx(pWriter->dReader.pReader, pWriter->dReader.aBlockIdx);
if (code) goto _err;
} else {
// TODO
}
// open writer
SHeadFile fHead = {.commitID = pWriter->commitID};
SDataFile fData = {.commitID = pWriter->commitID};
SSmaFile fSma = {.commitID = pWriter->commitID};
SSttFile fStt = {.commitID = pWriter->commitID};
SDFileSet wSet = {.fid = pWriter->fid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
if (pSet) {
wSet.diskId = pSet->diskId;
fData = *pSet->pDataF;
fSma = *pSet->pSmaF;
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
wSet.aSttF[iStt] = pSet->aSttF[iStt];
}
wSet.nSttF = pSet->nSttF + 1; // TODO: fix pSet->nSttF == pTsdb->maxFile
} else {
SDiskID did = {0};
tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
wSet.diskId = did;
wSet.nSttF = 1;
}
wSet.aSttF[wSet.nSttF - 1] = &fStt;
code = tsdbDataFWriterOpen(&pWriter->dWriter.pWriter, pWriter->pTsdb, &wSet);
if (code) goto _err; if (code) goto _err;
taosArrayClear(pWriter->dWriter.aBlockIdx);
tMapDataReset(&pWriter->dWriter.mDataBlk);
taosArrayClear(pWriter->dWriter.aSttBlk);
return code;
_err:
return code;
}
static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter) {
int32_t code = 0;
// TODO
return code;
}
// open file static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) {
TSDBKEY keyFirst = {.version = pBlockData->aVersion[0], .ts = pBlockData->aTSKEY[0]}; int32_t code = 0;
TSDBKEY keyLast = {.version = pBlockData->aVersion[pBlockData->nRow - 1],
.ts = pBlockData->aTSKEY[pBlockData->nRow - 1]}; pWriter->dReader.iBlockIdx++;
if (pWriter->dReader.iBlockIdx < taosArrayGetSize(pWriter->dReader.aBlockIdx)) {
pWriter->dReader.pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->dReader.aBlockIdx, pWriter->dReader.iBlockIdx);
code = tsdbReadBlock(pWriter->dReader.pReader, pWriter->dReader.pBlockIdx, &pWriter->dReader.mDataBlk);
if (code) goto _exit;
pWriter->dReader.iDataBlk = -1;
tBlockDataReset(&pWriter->dReader.bData);
pWriter->dReader.iRow = 0;
} else {
pWriter->dReader.pBlockIdx = NULL;
}
_exit:
return code;
}
static int32_t tsdbSnapWriteCopyData(STsdbSnapWriter* pWriter, TABLEID* pId) {
int32_t code = 0;
while (true) {
if (pWriter->dReader.pBlockIdx == NULL) break;
if (tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, pId) >= 0) break;
SBlockIdx blkIdx = *pWriter->dReader.pBlockIdx;
code = tsdbWriteBlock(pWriter->dWriter.pWriter, &pWriter->dReader.mDataBlk, &blkIdx);
if (code) goto _exit;
if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blkIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
code = tsdbSnapNextTableData(pWriter);
if (code) goto _exit;
}
_exit:
return code;
}
static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) {
int32_t code = 0;
SBlockData* pBlockData = &pWriter->bData;
TABLEID id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]};
TSDBROW row = tsdbRowFromBlockData(pBlockData, iRow);
TSDBKEY key = TSDBROW_KEY(&row);
// End last table data write if need
if (id.suid != pWriter->id.suid || id.uid != pWriter->id.uid) {
// TODO
pWriter->id.suid = 0;
pWriter->id.uid = 0;
}
int32_t fid = tsdbKeyFid(keyFirst.ts, pWriter->minutes, pWriter->precision); // Start new table data write if need
ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision)); if (pWriter->id.suid == 0 && pWriter->id.uid == 0) {
if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) { // Copy table data ahead
// end last file data write if need code = tsdbSnapWriteCopyData(pWriter, &id);
code = tsdbSnapWriteDataEnd(pWriter);
if (code) goto _err; if (code) goto _err;
pWriter->fid = fid; // Start new table data
pWriter->id.suid = id.suid;
pWriter->id.uid = id.uid;
tMapDataReset(&pWriter->dWriter.mDataBlk);
}
// read // Merge with .data file data
SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); if (pWriter->dReader.pBlockIdx && tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &id) == 0) {
if (pSet) { _merge_block:
code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet); // merge with data block in row
if (code) goto _err; for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) {
TSDBROW trow = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow);
TSDBKEY tKey = TSDBROW_KEY(&trow);
code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx); int32_t c = tsdbKeyCmprFn(&key, &tKey);
if (code) goto _err; if (c < 0) {
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid);
if (code) goto _err;
} else if (c > 0) {
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &trow, NULL, id.uid);
if (code) goto _err;
} else {
ASSERT(0);
}
code = tsdbReadSttBlk(pWriter->pDataFReader, 0, pWriter->aSstBlk); if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) {
if (code) goto _err; // TODO: commit to the block
} else { }
ASSERT(pWriter->pDataFReader == NULL);
taosArrayClear(pWriter->aBlockIdx); if (c < 0) goto _exit;
taosArrayClear(pWriter->aSstBlk);
} }
pWriter->iBlockIdx = 0;
pWriter->pBlockIdx = NULL; // merge with dataBlk in whole
tMapDataReset(&pWriter->mBlock); SDataBlk tDataBlk = {.minKey = key, .maxKey = key};
pWriter->iBlock = 0; for (pWriter->dReader.iBlockIdx++; pWriter->dReader.iBlockIdx < pWriter->dReader.mDataBlk.nItem;
pWriter->pBlockData = NULL; pWriter->dReader.iBlockIdx++) {
pWriter->iRow = 0; SDataBlk dataBlk;
pWriter->iBlockL = 0; tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iBlockIdx, &dataBlk, tGetDataBlk);
tBlockDataReset(&pWriter->bDataR);
tBlockDataReset(&pWriter->lDataR); int32_t c = tDataBlkCmprFn(&dataBlk, &tDataBlk);
// write if (c < 0) {
SHeadFile fHead; code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk);
SDataFile fData; if (code) goto _err;
SSttFile fLast; } else if (c < 0) {
SSmaFile fSma; code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid);
SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .aSttF[0] = &fLast, .pSmaF = &fSma}; if (code) goto _err;
if (pSet) { if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) {
wSet.diskId = pSet->diskId; // TODO: write data block
wSet.fid = fid; }
wSet.nSttF = 1; } else {
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0}; code = tsdbReadDataBlockEx(pWriter->dReader.pReader, &dataBlk, &pWriter->dReader.bData);
fData = *pSet->pDataF; if (code) goto _err;
fLast = (SSttFile){.commitID = pWriter->commitID, .size = 0};
fSma = *pSet->pSmaF; goto _merge_block;
} else { }
wSet.diskId = (SDiskID){.level = 0, .id = 0};
wSet.fid = fid;
wSet.nSttF = 1;
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
fData = (SDataFile){.commitID = pWriter->commitID, .size = 0};
fLast = (SSttFile){.commitID = pWriter->commitID, .size = 0, .offset = 0};
fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0};
} }
code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet); code = tsdbSnapNextTableData(pWriter);
if (code) goto _err; if (code) goto _err;
}
taosArrayClear(pWriter->aBlockIdxW); // Append to the .stt data block (todo: check if need to set/reload sst block)
taosArrayClear(pWriter->aBlockLW); code = tBlockDataAppendRow(&pWriter->dWriter.sData, &row, NULL, id.uid);
tMapDataReset(&pWriter->mBlockW); if (code) goto _err;
pWriter->pBlockIdxW = NULL;
tBlockDataReset(&pWriter->bDataW); if (pWriter->dWriter.sData.nRow >= pWriter->maxRow) {
// TODO: write sst block
} }
code = tsdbSnapWriteTableData(pWriter, id); _exit:
return code;
_err:
return code;
}
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb;
SBlockData* pBlockData = &pWriter->bData;
// Decode data
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
code = tDecmprBlockData(pHdr->data, pHdr->size, pBlockData, pWriter->aBuf);
if (code) goto _err; if (code) goto _err;
tsdbInfo("vgId:%d, vnode snapshot tsdb write data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d", // Loop to handle each row
TD_VID(pTsdb->pVnode), pTsdb->path, fid, id.suid, id.suid, pBlockData->nRow); for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
// open file if need
TSKEY ts = pBlockData->aTSKEY[iRow];
int32_t fid = tsdbKeyFid(ts, pWriter->minutes, pWriter->precision);
if (pWriter->dWriter.pWriter == NULL || pWriter->fid != fid) {
code = tsdbSnapWriteOpenFile(pWriter, fid);
if (code) goto _err;
}
code = tsdbSnapWriteRowData(pWriter, iRow);
if (code) goto _err;
}
// tsdbInfo("vgId:%d, vnode snapshot tsdb write data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d",
// TD_VID(pTsdb->pVnode), pTsdb->path, fid, id.suid, id.suid, pBlockData->nRow);
return code; return code;
_err: _err:
...@@ -1175,6 +1321,7 @@ _err: ...@@ -1175,6 +1321,7 @@ _err:
return code; return code;
} }
// SNAP_DATA_DEL
static int32_t tsdbSnapMoveWriteDelData(STsdbSnapWriter* pWriter, TABLEID* pId) { static int32_t tsdbSnapMoveWriteDelData(STsdbSnapWriter* pWriter, TABLEID* pId) {
int32_t code = 0; int32_t code = 0;
...@@ -1274,7 +1421,6 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 ...@@ -1274,7 +1421,6 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
goto _err; goto _err;
} }
_exit:
return code; return code;
_err: _err:
...@@ -1287,12 +1433,15 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { ...@@ -1287,12 +1433,15 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb; STsdb* pTsdb = pWriter->pTsdb;
if (pWriter->pDelFWriter == NULL) goto _exit; if (pWriter->pDelFWriter == NULL) return code;
TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX}; TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX};
code = tsdbSnapMoveWriteDelData(pWriter, &id); code = tsdbSnapMoveWriteDelData(pWriter, &id);
if (code) goto _err; if (code) goto _err;
code = tsdbWriteDelIdx(pWriter->pDelFWriter, pWriter->aDelIdxW);
if (code) goto _err;
code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter); code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
if (code) goto _err; if (code) goto _err;
...@@ -1307,7 +1456,6 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { ...@@ -1307,7 +1456,6 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
if (code) goto _err; if (code) goto _err;
} }
_exit:
tsdbInfo("vgId:%d, vnode snapshot tsdb write del for %s end", TD_VID(pTsdb->pVnode), pTsdb->path); tsdbInfo("vgId:%d, vnode snapshot tsdb write del for %s end", TD_VID(pTsdb->pVnode), pTsdb->path);
return code; return code;
...@@ -1344,8 +1492,9 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -1344,8 +1492,9 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
// for data file // for data file
code = tBlockDataCreate(&pWriter->bData); code = tBlockDataCreate(&pWriter->bData);
if (code) goto _err; if (code) goto _err;
#if 0
pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pWriter->aBlockIdx == NULL) { if (pWriter->aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -1373,8 +1522,9 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -1373,8 +1522,9 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
#endif
// for del file // SNAP_DATA_DEL
pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx)); pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx));
if (pWriter->aDelIdxR == NULL) { if (pWriter->aDelIdxR == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -1395,6 +1545,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -1395,6 +1545,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
tsdbInfo("vgId:%d, tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path); tsdbInfo("vgId:%d, tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path);
return code; return code;
_err: _err:
tsdbError("vgId:%d, tsdb snapshot writer open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, tsdbError("vgId:%d, tsdb snapshot writer open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code)); tstrerror(code));
...@@ -1411,7 +1562,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { ...@@ -1411,7 +1562,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
// code = tsdbFSRollback(pWriter->pTsdb->pFS); // code = tsdbFSRollback(pWriter->pTsdb->pFS);
// if (code) goto _err; // if (code) goto _err;
} else { } else {
code = tsdbSnapWriteDataEnd(pWriter); // code = tsdbSnapWriteDataEnd(pWriter);
if (code) goto _err; if (code) goto _err;
code = tsdbSnapWriteDelEnd(pWriter); code = tsdbSnapWriteDelEnd(pWriter);
...@@ -1452,8 +1603,8 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) ...@@ -1452,8 +1603,8 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
goto _exit; goto _exit;
} else { } else {
if (pWriter->pDataFWriter) { if (pWriter->dWriter.pWriter) {
code = tsdbSnapWriteDataEnd(pWriter); // code = tsdbSnapWriteDataEnd(pWriter);
if (code) goto _err; if (code) goto _err;
} }
} }
...@@ -1466,7 +1617,6 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) ...@@ -1466,7 +1617,6 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
_exit: _exit:
tsdbDebug("vgId:%d, tsdb snapshot write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); tsdbDebug("vgId:%d, tsdb snapshot write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
return code; return code;
_err: _err:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册