提交 d789fe6a 编写于 作者: H Hongze Cheng

refact commit code

上级 5fd403b8
...@@ -115,33 +115,32 @@ int32_t tRowInfoCmprFn(const void *p1, const void *p2) { ...@@ -115,33 +115,32 @@ int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
int32_t tsdbBegin(STsdb *pTsdb) { int32_t tsdbBegin(STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
if (!pTsdb) return code; if (!pTsdb) return code;
SMemTable *pMemTable; SMemTable *pMemTable;
code = tsdbMemTableCreate(pTsdb, &pMemTable); code = tsdbMemTableCreate(pTsdb, &pMemTable);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// lock // lock
code = taosThreadRwlockWrlock(&pTsdb->rwLock); if ((code = taosThreadRwlockWrlock(&pTsdb->rwLock))) {
if (code) {
code = TAOS_SYSTEM_ERROR(code); code = TAOS_SYSTEM_ERROR(code);
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
pTsdb->mem = pMemTable; pTsdb->mem = pMemTable;
// unlock // unlock
code = taosThreadRwlockUnlock(&pTsdb->rwLock); if ((code = taosThreadRwlockUnlock(&pTsdb->rwLock))) {
if (code) {
code = TAOS_SYSTEM_ERROR(code); code = TAOS_SYSTEM_ERROR(code);
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
return code; _exit:
if (code) {
_err: tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d, tsdb begin failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); }
return code; return code;
} }
...@@ -149,6 +148,7 @@ int32_t tsdbCommit(STsdb *pTsdb) { ...@@ -149,6 +148,7 @@ int32_t tsdbCommit(STsdb *pTsdb) {
if (!pTsdb) return 0; if (!pTsdb) return 0;
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SCommitter commith; SCommitter commith;
SMemTable *pMemTable = pTsdb->mem; SMemTable *pMemTable = pTsdb->mem;
...@@ -164,76 +164,74 @@ int32_t tsdbCommit(STsdb *pTsdb) { ...@@ -164,76 +164,74 @@ int32_t tsdbCommit(STsdb *pTsdb) {
// start commit // start commit
code = tsdbStartCommit(pTsdb, &commith); code = tsdbStartCommit(pTsdb, &commith);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// commit impl // commit impl
code = tsdbCommitData(&commith); code = tsdbCommitData(&commith);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCommitDel(&commith); code = tsdbCommitDel(&commith);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// end commit // end commit
code = tsdbEndCommit(&commith, 0); code = tsdbEndCommit(&commith, 0);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
return code; if (code) {
tsdbEndCommit(&commith, code);
_err: tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbEndCommit(&commith, code); }
tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = pCommitter->pTsdb; STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem; SMemTable *pMemTable = pTsdb->imem;
pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx)); if ((pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
if (pCommitter->aDelIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData)); if ((pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
if (pCommitter->aDelData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx)); if ((pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
if (pCommitter->aDelIdxN == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
SDelFile *pDelFileR = pCommitter->fs.pDelFile; SDelFile *pDelFileR = pCommitter->fs.pDelFile;
if (pDelFileR) { if (pDelFileR) {
code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb); code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx); code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
// prepare new // prepare new
SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0}; SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0};
code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb); code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode)); if (code) {
return code; tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
_err: tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode));
tsdbError("vgId:%d, commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); }
return code; return code;
} }
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) { static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SDelData *pDelData; SDelData *pDelData;
tb_uid_t suid; tb_uid_t suid;
tb_uid_t uid; tb_uid_t uid;
...@@ -252,7 +250,7 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel ...@@ -252,7 +250,7 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel
uid = pDelIdx->uid; uid = pDelIdx->uid;
code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData); code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
taosArrayClear(pCommitter->aDelData); taosArrayClear(pCommitter->aDelData);
} }
...@@ -266,62 +264,63 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel ...@@ -266,62 +264,63 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel
for (; pDelData; pDelData = pDelData->pNext) { for (; pDelData; pDelData = pDelData->pNext) {
if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) { if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
// write // write
code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, &delIdx); code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, &delIdx);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// put delIdx // put delIdx
if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) { if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
_exit: _exit:
return code; if (code) {
tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
_err: }
tsdbError("vgId:%d, commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = pCommitter->pTsdb; STsdb *pTsdb = pCommitter->pTsdb;
code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN); code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter); code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbFSUpsertDelFile(&pCommitter->fs, &pCommitter->pDelFWriter->fDel); code = tsdbFSUpsertDelFile(&pCommitter->fs, &pCommitter->pDelFWriter->fDel);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1); code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
if (pCommitter->pDelFReader) { if (pCommitter->pDelFReader) {
code = tsdbDelFReaderClose(&pCommitter->pDelFReader); code = tsdbDelFReaderClose(&pCommitter->pDelFReader);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
taosArrayDestroy(pCommitter->aDelIdx); taosArrayDestroy(pCommitter->aDelIdx);
taosArrayDestroy(pCommitter->aDelData); taosArrayDestroy(pCommitter->aDelData);
taosArrayDestroy(pCommitter->aDelIdxN); taosArrayDestroy(pCommitter->aDelIdxN);
return code; _exit:
if (code) {
_err: tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d, commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); }
return code; return code;
} }
int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo) { int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
if (suid) { if (suid) {
if (pSkmInfo->suid == suid) { if (pSkmInfo->suid == suid) {
...@@ -336,7 +335,7 @@ int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo ...@@ -336,7 +335,7 @@ int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo
pSkmInfo->uid = uid; pSkmInfo->uid = uid;
tTSchemaDestroy(pSkmInfo->pTSchema); tTSchemaDestroy(pSkmInfo->pTSchema);
code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema); code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema);
if (code) goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
return code; return code;
...@@ -344,6 +343,7 @@ _exit: ...@@ -344,6 +343,7 @@ _exit:
static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) { static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
if (pCommitter->skmRow.pTSchema) { if (pCommitter->skmRow.pTSchema) {
if (pCommitter->skmRow.suid == suid) { if (pCommitter->skmRow.suid == suid) {
...@@ -359,9 +359,7 @@ static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid ...@@ -359,9 +359,7 @@ static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid
pCommitter->skmRow.uid = uid; pCommitter->skmRow.uid = uid;
tTSchemaDestroy(pCommitter->skmRow.pTSchema); tTSchemaDestroy(pCommitter->skmRow.pTSchema);
code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema); code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
if (code) { TSDB_CHECK_CODE(code, lino, _exit);
goto _exit;
}
_exit: _exit:
return code; return code;
...@@ -369,6 +367,7 @@ _exit: ...@@ -369,6 +367,7 @@ _exit:
static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) { static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
ASSERT(pCommitter->dReader.pBlockIdx); ASSERT(pCommitter->dReader.pBlockIdx);
...@@ -378,7 +377,7 @@ static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) { ...@@ -378,7 +377,7 @@ static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) {
(SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx); (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);
code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
if (code) goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(pCommitter->dReader.mBlock.nItem > 0); ASSERT(pCommitter->dReader.mBlock.nItem > 0);
} else { } else {
...@@ -391,6 +390,7 @@ _exit: ...@@ -391,6 +390,7 @@ _exit:
static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
pCommitter->pIter = NULL; pCommitter->pIter = NULL;
tRBTreeCreate(&pCommitter->rbt, tRowInfoCmprFn); tRBTreeCreate(&pCommitter->rbt, tRowInfoCmprFn);
...@@ -431,14 +431,14 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { ...@@ -431,14 +431,14 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
pIter->iStt = iStt; pIter->iStt = iStt;
code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk); code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pIter->aSttBlk) == 0) continue; if (taosArrayGetSize(pIter->aSttBlk) == 0) continue;
pIter->iSttBlk = 0; pIter->iSttBlk = 0;
SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, 0); SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, 0);
code = tsdbReadSttBlock(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData); code = tsdbReadSttBlock(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
pIter->iRow = 0; pIter->iRow = 0;
pIter->r.suid = pIter->bData.suid; pIter->r.suid = pIter->bData.suid;
...@@ -460,16 +460,18 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { ...@@ -460,16 +460,18 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
} }
code = tsdbNextCommitRow(pCommitter); code = tsdbNextCommitRow(pCommitter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
return code;
_err: _exit:
if (code) {
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code; return code;
} }
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = pCommitter->pTsdb; STsdb *pTsdb = pCommitter->pTsdb;
SDFileSet *pRSet = NULL; SDFileSet *pRSet = NULL;
...@@ -484,17 +486,17 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -484,17 +486,17 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &tDFileSet, tDFileSetCmprFn, TD_EQ); pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &tDFileSet, tDFileSetCmprFn, TD_EQ);
if (pRSet) { if (pRSet) {
code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet); code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// data // data
code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx); code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
pCommitter->dReader.iBlockIdx = 0; pCommitter->dReader.iBlockIdx = 0;
if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) { if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) {
pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0); pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0);
code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
pCommitter->dReader.pBlockIdx = NULL; pCommitter->dReader.pBlockIdx = NULL;
} }
...@@ -531,7 +533,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -531,7 +533,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
} }
wSet.aSttF[wSet.nSttF - 1] = &fStt; wSet.aSttF[wSet.nSttF - 1] = &fStt;
code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet); code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
taosArrayClear(pCommitter->dWriter.aBlockIdx); taosArrayClear(pCommitter->dWriter.aBlockIdx);
taosArrayClear(pCommitter->dWriter.aSttBlk); taosArrayClear(pCommitter->dWriter.aSttBlk);
...@@ -541,18 +543,18 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -541,18 +543,18 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
// open iter // open iter
code = tsdbOpenCommitIter(pCommitter); code = tsdbOpenCommitIter(pCommitter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
return code; if (code) {
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
_err: }
tsdbError("vgId:%d, commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg) { int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
if (pBlockData->nRow == 0) return code; if (pBlockData->nRow == 0) return code;
...@@ -586,24 +588,25 @@ int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapDa ...@@ -586,24 +588,25 @@ int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapDa
dataBlk.nSubBlock++; dataBlk.nSubBlock++;
code = tsdbWriteBlockData(pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1], code = tsdbWriteBlockData(pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1],
((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL, cmprAlg, 0); ((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL, cmprAlg, 0);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// put SDataBlk // put SDataBlk
code = tMapDataPutItem(mDataBlk, &dataBlk, tPutDataBlk); code = tMapDataPutItem(mDataBlk, &dataBlk, tPutDataBlk);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// clear // clear
tBlockDataClear(pBlockData); tBlockDataClear(pBlockData);
return code; _exit:
if (code) {
_err: tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d tsdb commit data block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); }
return code; return code;
} }
int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg) { int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SSttBlk sstBlk; SSttBlk sstBlk;
if (pBlockData->nRow == 0) return code; if (pBlockData->nRow == 0) return code;
...@@ -626,114 +629,117 @@ int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray ...@@ -626,114 +629,117 @@ int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray
// write // write
code = tsdbWriteBlockData(pWriter, pBlockData, &sstBlk.bInfo, NULL, cmprAlg, 1); code = tsdbWriteBlockData(pWriter, pBlockData, &sstBlk.bInfo, NULL, cmprAlg, 1);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// push SSttBlk // push SSttBlk
if (taosArrayPush(aSttBlk, &sstBlk) == NULL) { if (taosArrayPush(aSttBlk, &sstBlk) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
// clear // clear
tBlockDataClear(pBlockData); tBlockDataClear(pBlockData);
return code; _exit:
if (code) {
_err: tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d tsdb commit last block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); }
return code; return code;
} }
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
// write aBlockIdx // write aBlockIdx
code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx); code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// write aSttBlk // write aSttBlk
code = tsdbWriteSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSttBlk); code = tsdbWriteSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSttBlk);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// update file header // update file header
code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter); code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// upsert SDFileSet // upsert SDFileSet
code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->dWriter.pWriter->wSet); code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->dWriter.pWriter->wSet);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// close and sync // close and sync
code = tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 1); code = tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 1);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
if (pCommitter->dReader.pReader) { if (pCommitter->dReader.pReader) {
code = tsdbDataFReaderClose(&pCommitter->dReader.pReader); code = tsdbDataFReaderClose(&pCommitter->dReader.pReader);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
_exit: _exit:
return code; if (code) {
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
_err: }
tsdbError("vgId:%d, commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) { while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) {
SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx; SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx;
code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx); code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tsdbCommitterNextTableData(pCommitter); code = tsdbCommitterNextTableData(pCommitter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
return code; _exit:
if (code) {
_err: tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d tsdb move commit data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); }
return code; return code;
} }
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter); static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter);
static int32_t tsdbCommitFileData(SCommitter *pCommitter) { static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = pCommitter->pTsdb; STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem; SMemTable *pMemTable = pTsdb->imem;
// commit file data start // commit file data start
code = tsdbCommitFileDataStart(pCommitter); code = tsdbCommitFileDataStart(pCommitter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// impl // impl
code = tsdbCommitFileDataImpl(pCommitter); code = tsdbCommitFileDataImpl(pCommitter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// commit file data end // commit file data end
code = tsdbCommitFileDataEnd(pCommitter); code = tsdbCommitFileDataEnd(pCommitter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
return code;
_err: _exit:
tsdbError("vgId:%d, commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); if (code) {
tsdbDataFReaderClose(&pCommitter->dReader.pReader); tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0); tsdbDataFReaderClose(&pCommitter->dReader.pReader);
tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0);
}
return code; return code;
} }
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
memset(pCommitter, 0, sizeof(*pCommitter)); memset(pCommitter, 0, sizeof(*pCommitter));
ASSERT(pTsdb->mem && pTsdb->imem == NULL); ASSERT(pTsdb->mem && pTsdb->imem == NULL);
...@@ -754,30 +760,31 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { ...@@ -754,30 +760,31 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem); pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
if (pCommitter->aTbDataP == NULL) { if (pCommitter->aTbDataP == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tsdbFSCopy(pTsdb, &pCommitter->fs); code = tsdbFSCopy(pTsdb, &pCommitter->fs);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
return code; _exit:
if (code) {
_err: tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d, tsdb start commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); }
return code; return code;
} }
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
// reader // reader
pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pCommitter->dReader.aBlockIdx == NULL) { if (pCommitter->dReader.aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tBlockDataCreate(&pCommitter->dReader.bData); code = tBlockDataCreate(&pCommitter->dReader.bData);
if (code) goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
// merger // merger
for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) { for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) {
...@@ -785,33 +792,36 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { ...@@ -785,33 +792,36 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if (pIter->aSttBlk == NULL) { if (pIter->aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tBlockDataCreate(&pIter->bData); code = tBlockDataCreate(&pIter->bData);
if (code) goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
} }
// writer // writer
pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pCommitter->dWriter.aBlockIdx == NULL) { if (pCommitter->dWriter.aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
} }
pCommitter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); pCommitter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if (pCommitter->dWriter.aSttBlk == NULL) { if (pCommitter->dWriter.aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tBlockDataCreate(&pCommitter->dWriter.bData); code = tBlockDataCreate(&pCommitter->dWriter.bData);
if (code) goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataCreate(&pCommitter->dWriter.bDatal); code = tBlockDataCreate(&pCommitter->dWriter.bDatal);
if (code) goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) {
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code; return code;
} }
...@@ -839,7 +849,9 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) { ...@@ -839,7 +849,9 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) {
} }
static int32_t tsdbCommitData(SCommitter *pCommitter) { static int32_t tsdbCommitData(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = pCommitter->pTsdb; STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem; SMemTable *pMemTable = pTsdb->imem;
...@@ -848,30 +860,29 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) { ...@@ -848,30 +860,29 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) {
// start ==================== // start ====================
code = tsdbCommitDataStart(pCommitter); code = tsdbCommitDataStart(pCommitter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// impl ==================== // impl ====================
pCommitter->nextKey = pMemTable->minKey; pCommitter->nextKey = pMemTable->minKey;
while (pCommitter->nextKey < TSKEY_MAX) { while (pCommitter->nextKey < TSKEY_MAX) {
code = tsdbCommitFileData(pCommitter); code = tsdbCommitFileData(pCommitter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
// end ==================== // end ====================
tsdbCommitDataEnd(pCommitter); tsdbCommitDataEnd(pCommitter);
_exit: _exit:
tsdbInfo("vgId:%d, commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow); if (code) {
return code; tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
}
_err:
tsdbCommitDataEnd(pCommitter);
tsdbError("vgId:%d, commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
static int32_t tsdbCommitDel(SCommitter *pCommitter) { static int32_t tsdbCommitDel(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = pCommitter->pTsdb; STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem; SMemTable *pMemTable = pTsdb->imem;
...@@ -882,7 +893,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { ...@@ -882,7 +893,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) {
// start // start
code = tsdbCommitDelStart(pCommitter); code = tsdbCommitDelStart(pCommitter);
if (code) { if (code) {
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
// impl // impl
...@@ -918,7 +929,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { ...@@ -918,7 +929,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) {
_commit_mem_del: _commit_mem_del:
code = tsdbCommitTableDel(pCommitter, pTbData, NULL); code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
iTbData++; iTbData++;
pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL; pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
...@@ -926,7 +937,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { ...@@ -926,7 +937,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) {
_commit_disk_del: _commit_disk_del:
code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx); code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
iDelIdx++; iDelIdx++;
pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL; pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
...@@ -934,7 +945,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { ...@@ -934,7 +945,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) {
_commit_mem_and_disk_del: _commit_mem_and_disk_del:
code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx); code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
iTbData++; iTbData++;
pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL; pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
...@@ -945,28 +956,28 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { ...@@ -945,28 +956,28 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) {
// end // end
code = tsdbCommitDelEnd(pCommitter); code = tsdbCommitDelEnd(pCommitter);
if (code) { TSDB_CHECK_CODE(code, lino, _exit);
goto _err;
}
_exit: _exit:
tsdbDebug("vgId:%d, commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel); if (code) {
return code; tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
_err: tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
tsdbError("vgId:%d, commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); }
return code; return code;
} }
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = pCommitter->pTsdb; STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem; SMemTable *pMemTable = pTsdb->imem;
ASSERT(eno == 0); ASSERT(eno == 0);
code = tsdbFSCommit1(pTsdb, &pCommitter->fs); code = tsdbFSCommit1(pTsdb, &pCommitter->fs);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// lock // lock
taosThreadRwlockWrlock(&pTsdb->rwLock); taosThreadRwlockWrlock(&pTsdb->rwLock);
...@@ -975,7 +986,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { ...@@ -975,7 +986,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
code = tsdbFSCommit2(pTsdb, &pCommitter->fs); code = tsdbFSCommit2(pTsdb, &pCommitter->fs);
if (code) { if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
pTsdb->imem = NULL; pTsdb->imem = NULL;
...@@ -987,16 +998,12 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { ...@@ -987,16 +998,12 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
tsdbFSDestroy(&pCommitter->fs); tsdbFSDestroy(&pCommitter->fs);
taosArrayDestroy(pCommitter->aTbDataP); taosArrayDestroy(pCommitter->aTbDataP);
// if (pCommitter->toMerge) { _exit:
// code = tsdbMerge(pTsdb); if (code) {
// if (code) goto _err; tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
// } } else {
tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode)); }
return code;
_err:
tsdbError("vgId:%d, tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
...@@ -1008,6 +1015,7 @@ static FORCE_INLINE SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) { ...@@ -1008,6 +1015,7 @@ static FORCE_INLINE SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) {
static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
if (pCommitter->pIter) { if (pCommitter->pIter) {
SDataIter *pIter = pCommitter->pIter; SDataIter *pIter = pCommitter->pIter;
...@@ -1050,7 +1058,7 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { ...@@ -1050,7 +1058,7 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk); SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
code = tsdbReadSttBlock(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData); code = tsdbReadSttBlock(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData);
if (code) goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
pIter->iRow = 0; pIter->iRow = 0;
pIter->r.suid = pIter->bData.suid; pIter->r.suid = pIter->bData.suid;
...@@ -1085,11 +1093,16 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { ...@@ -1085,11 +1093,16 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
} }
_exit: _exit:
if (code) {
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code; return code;
} }
static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) { static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SBlockData *pBlockData = &pCommitter->dWriter.bData; SBlockData *pBlockData = &pCommitter->dWriter.bData;
SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid}; TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
...@@ -1098,13 +1111,13 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) ...@@ -1098,13 +1111,13 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
while (pRowInfo) { while (pRowInfo) {
ASSERT(pRowInfo->row.type == 0); ASSERT(pRowInfo->row.type == 0);
code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid); code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbNextCommitRow(pCommitter); code = tsdbNextCommitRow(pCommitter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
pRowInfo = tsdbGetCommitRow(pCommitter); pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo) { if (pRowInfo) {
...@@ -1119,29 +1132,31 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) ...@@ -1119,29 +1132,31 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
if (pBlockData->nRow >= pCommitter->maxRow) { if (pBlockData->nRow >= pCommitter->maxRow) {
code = code =
tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
return code; _exit:
if (code) {
_err: tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d, tsdb commit ahead block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); }
return code; return code;
} }
static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) { static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid}; TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
SBlockData *pBDataR = &pCommitter->dReader.bData; SBlockData *pBDataR = &pCommitter->dReader.bData;
SBlockData *pBDataW = &pCommitter->dWriter.bData; SBlockData *pBDataW = &pCommitter->dWriter.bData;
code = tsdbReadDataBlock(pCommitter->dReader.pReader, pDataBlk, pBDataR); code = tsdbReadDataBlock(pCommitter->dReader.pReader, pDataBlk, pBDataR);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
tBlockDataClear(pBDataW); tBlockDataClear(pBDataW);
int32_t iRow = 0; int32_t iRow = 0;
...@@ -1152,7 +1167,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) ...@@ -1152,7 +1167,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row); int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row);
if (c < 0) { if (c < 0) {
code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid); code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
iRow++; iRow++;
if (iRow < pBDataR->nRow) { if (iRow < pBDataR->nRow) {
...@@ -1163,13 +1178,13 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) ...@@ -1163,13 +1178,13 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
} else if (c > 0) { } else if (c > 0) {
ASSERT(pRowInfo->row.type == 0); ASSERT(pRowInfo->row.type == 0);
code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataAppendRow(pBDataW, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid); code = tBlockDataAppendRow(pBDataW, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbNextCommitRow(pCommitter); code = tsdbNextCommitRow(pCommitter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
pRowInfo = tsdbGetCommitRow(pCommitter); pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo) { if (pRowInfo) {
...@@ -1186,13 +1201,13 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) ...@@ -1186,13 +1201,13 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
if (pBDataW->nRow >= pCommitter->maxRow) { if (pBDataW->nRow >= pCommitter->maxRow) {
code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
while (pRow) { while (pRow) {
code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid); code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
iRow++; iRow++;
if (iRow < pBDataR->nRow) { if (iRow < pBDataR->nRow) {
...@@ -1203,22 +1218,24 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) ...@@ -1203,22 +1218,24 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
if (pBDataW->nRow >= pCommitter->maxRow) { if (pBDataW->nRow >= pCommitter->maxRow) {
code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
return code;
_err: _exit:
tsdbError("vgId:%d, tsdb commit merge block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); if (code) {
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code; return code;
} }
static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx; SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx;
ASSERT(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0); ASSERT(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0);
...@@ -1237,7 +1254,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1237,7 +1254,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
if (c < 0) { if (c < 0) {
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk); code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
iBlock++; iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) { if (iBlock < pCommitter->dReader.mBlock.nItem) {
...@@ -1247,13 +1264,13 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1247,13 +1264,13 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
} }
} else if (c > 0) { } else if (c > 0) {
code = tsdbCommitAheadBlock(pCommitter, pDataBlk); code = tsdbCommitAheadBlock(pCommitter, pDataBlk);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
pRowInfo = tsdbGetCommitRow(pCommitter); pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL; if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
} else { } else {
code = tsdbCommitMergeBlock(pCommitter, pDataBlk); code = tsdbCommitMergeBlock(pCommitter, pDataBlk);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
iBlock++; iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) { if (iBlock < pCommitter->dReader.mBlock.nItem) {
...@@ -1268,7 +1285,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1268,7 +1285,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
while (pDataBlk) { while (pDataBlk) {
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk); code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
iBlock++; iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) { if (iBlock < pCommitter->dReader.mBlock.nItem) {
...@@ -1279,25 +1296,25 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1279,25 +1296,25 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
} }
code = tsdbCommitterNextTableData(pCommitter); code = tsdbCommitterNextTableData(pCommitter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
_exit: _exit:
return code; if (code) {
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
_err: }
tsdbError("vgId:%d tsdb merge table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) { static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SBlockData *pBDatal = &pCommitter->dWriter.bDatal; SBlockData *pBDatal = &pCommitter->dWriter.bDatal;
if (pBDatal->suid || pBDatal->uid) { if (pBDatal->suid || pBDatal->uid) {
if ((pBDatal->suid != id.suid) || (id.suid == 0)) { if ((pBDatal->suid != id.suid) || (id.suid == 0)) {
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
if (code) goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
tBlockDataReset(pBDatal); tBlockDataReset(pBDatal);
} }
} }
...@@ -1306,42 +1323,48 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) { ...@@ -1306,42 +1323,48 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) {
ASSERT(pCommitter->skmTable.suid == id.suid); ASSERT(pCommitter->skmTable.suid == id.suid);
ASSERT(pCommitter->skmTable.uid == id.uid); ASSERT(pCommitter->skmTable.uid == id.uid);
code = tBlockDataInit(pBDatal, id.suid, id.suid ? 0 : id.uid, pCommitter->skmTable.pTSchema); code = tBlockDataInit(pBDatal, id.suid, id.suid ? 0 : id.uid, pCommitter->skmTable.pTSchema);
if (code) goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
} }
_exit: _exit:
if (code) {
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code; return code;
} }
static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SBlockData *pBData = &pCommitter->dWriter.bData; SBlockData *pBData = &pCommitter->dWriter.bData;
SBlockData *pBDatal = &pCommitter->dWriter.bDatal; SBlockData *pBDatal = &pCommitter->dWriter.bDatal;
TABLEID id = {.suid = pBData->suid, .uid = pBData->uid}; TABLEID id = {.suid = pBData->suid, .uid = pBData->uid};
code = tsdbInitLastBlockIfNeed(pCommitter, id); code = tsdbInitLastBlockIfNeed(pCommitter, id);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) { for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) {
TSDBROW row = tsdbRowFromBlockData(pBData, iRow); TSDBROW row = tsdbRowFromBlockData(pBData, iRow);
code = tBlockDataAppendRow(pBDatal, &row, NULL, pBData->uid); code = tBlockDataAppendRow(pBDatal, &row, NULL, pBData->uid);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
if (pBDatal->nRow >= pCommitter->maxRow) { if (pBDatal->nRow >= pCommitter->maxRow) {
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
return code; _exit:
if (code) {
_err: tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code; return code;
} }
static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
...@@ -1354,7 +1377,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1354,7 +1377,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
if (pCommitter->toLastOnly) { if (pCommitter->toLastOnly) {
pBData = &pCommitter->dWriter.bDatal; pBData = &pCommitter->dWriter.bDatal;
code = tsdbInitLastBlockIfNeed(pCommitter, id); code = tsdbInitLastBlockIfNeed(pCommitter, id);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
pBData = &pCommitter->dWriter.bData; pBData = &pCommitter->dWriter.bData;
ASSERT(pBData->nRow == 0); ASSERT(pBData->nRow == 0);
...@@ -1364,15 +1387,15 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1364,15 +1387,15 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
STSchema *pTSchema = NULL; STSchema *pTSchema = NULL;
if (pRowInfo->row.type == 0) { if (pRowInfo->row.type == 0) {
code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
pTSchema = pCommitter->skmRow.pTSchema; pTSchema = pCommitter->skmRow.pTSchema;
} }
code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid); code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbNextCommitRow(pCommitter); code = tsdbNextCommitRow(pCommitter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
pRowInfo = tsdbGetCommitRow(pCommitter); pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
...@@ -1382,11 +1405,11 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1382,11 +1405,11 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
if (pBData->nRow >= pCommitter->maxRow) { if (pBData->nRow >= pCommitter->maxRow) {
if (pCommitter->toLastOnly) { if (pCommitter->toLastOnly) {
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
code = code =
tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
} }
...@@ -1394,23 +1417,23 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1394,23 +1417,23 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
if (!pCommitter->toLastOnly && pBData->nRow) { if (!pCommitter->toLastOnly && pBData->nRow) {
if (pBData->nRow > pCommitter->minRow) { if (pBData->nRow > pCommitter->minRow) {
code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
code = tsdbAppendLastBlock(pCommitter); code = tsdbAppendLastBlock(pCommitter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
_exit: _exit:
return code; if (code) {
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
_err: }
tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SRowInfo *pRowInfo; SRowInfo *pRowInfo;
TABLEID id = {0}; TABLEID id = {0};
...@@ -1420,36 +1443,36 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { ...@@ -1420,36 +1443,36 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
id.uid = pRowInfo->uid; id.uid = pRowInfo->uid;
code = tsdbMoveCommitData(pCommitter, id); code = tsdbMoveCommitData(pCommitter, id);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// start // start
tMapDataReset(&pCommitter->dWriter.mBlock); tMapDataReset(&pCommitter->dWriter.mBlock);
// impl // impl
code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable); code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema); code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataInit(&pCommitter->dWriter.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema); code = tBlockDataInit(&pCommitter->dWriter.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
/* merge with data in .data file */ /* merge with data in .data file */
code = tsdbMergeTableData(pCommitter, id); code = tsdbMergeTableData(pCommitter, id);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
/* handle remain table data */ /* handle remain table data */
code = tsdbCommitTableData(pCommitter, id); code = tsdbCommitTableData(pCommitter, id);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// end // end
if (pCommitter->dWriter.mBlock.nItem > 0) { if (pCommitter->dWriter.mBlock.nItem > 0) {
SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid}; SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid};
code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx); code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
} }
...@@ -1457,15 +1480,15 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { ...@@ -1457,15 +1480,15 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
id.suid = INT64_MAX; id.suid = INT64_MAX;
id.uid = INT64_MAX; id.uid = INT64_MAX;
code = tsdbMoveCommitData(pCommitter, id); code = tsdbMoveCommitData(pCommitter, id);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk, code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
pCommitter->cmprAlg); pCommitter->cmprAlg);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
return code;
_err: _exit:
tsdbError("vgId:%d tsdb commit file data impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); if (code) {
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code; return code;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册