From d789fe6ac84300611965803ee43c483ce10297a7 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 23 Sep 2022 10:40:54 +0800 Subject: [PATCH] refact commit code --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 483 ++++++++++++----------- 1 file changed, 253 insertions(+), 230 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index a619b9f2e4..3daab532f3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -115,33 +115,32 @@ int32_t tRowInfoCmprFn(const void *p1, const void *p2) { int32_t tsdbBegin(STsdb *pTsdb) { int32_t code = 0; + int32_t lino = 0; if (!pTsdb) return code; SMemTable *pMemTable; code = tsdbMemTableCreate(pTsdb, &pMemTable); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // lock - code = taosThreadRwlockWrlock(&pTsdb->rwLock); - if (code) { + if ((code = taosThreadRwlockWrlock(&pTsdb->rwLock))) { code = TAOS_SYSTEM_ERROR(code); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } pTsdb->mem = pMemTable; // unlock - code = taosThreadRwlockUnlock(&pTsdb->rwLock); - if (code) { + if ((code = taosThreadRwlockUnlock(&pTsdb->rwLock))) { code = TAOS_SYSTEM_ERROR(code); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } - return code; - -_err: - tsdbError("vgId:%d, tsdb begin failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } @@ -149,6 +148,7 @@ int32_t tsdbCommit(STsdb *pTsdb) { if (!pTsdb) return 0; int32_t code = 0; + int32_t lino = 0; SCommitter commith; SMemTable *pMemTable = pTsdb->mem; @@ -164,76 +164,74 @@ int32_t tsdbCommit(STsdb *pTsdb) { // start commit code = tsdbStartCommit(pTsdb, &commith); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // commit impl code = tsdbCommitData(&commith); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); code = tsdbCommitDel(&commith); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // end commit code = tsdbEndCommit(&commith, 0); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); _exit: - return code; - -_err: - tsdbEndCommit(&commith, code); - tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + if (code) { + tsdbEndCommit(&commith, code); + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { int32_t code = 0; + int32_t lino = 0; STsdb *pTsdb = pCommitter->pTsdb; SMemTable *pMemTable = pTsdb->imem; - pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx)); - if (pCommitter->aDelIdx == NULL) { + if ((pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } - pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData)); - if (pCommitter->aDelData == NULL) { + if ((pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } - pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx)); - if (pCommitter->aDelIdxN == NULL) { + if ((pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } SDelFile *pDelFileR = pCommitter->fs.pDelFile; if (pDelFileR) { code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } // prepare new SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0}; code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); _exit: - tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode)); - return code; - -_err: - tsdbError("vgId:%d, commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + if (code) { + tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode)); + } return code; } static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) { int32_t code = 0; + int32_t lino = 0; SDelData *pDelData; tb_uid_t suid; tb_uid_t uid; @@ -252,7 +250,7 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel uid = pDelIdx->uid; code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } else { taosArrayClear(pCommitter->aDelData); } @@ -266,62 +264,63 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel for (; pDelData; pDelData = pDelData->pNext) { if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } } // write code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, &delIdx); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // put delIdx if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } _exit: - return code; - -_err: - tsdbError("vgId:%d, commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + if (code) { + tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { int32_t code = 0; + int32_t lino = 0; STsdb *pTsdb = pCommitter->pTsdb; code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); code = tsdbFSUpsertDelFile(&pCommitter->fs, &pCommitter->pDelFWriter->fDel); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); if (pCommitter->pDelFReader) { code = tsdbDelFReaderClose(&pCommitter->pDelFReader); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } taosArrayDestroy(pCommitter->aDelIdx); taosArrayDestroy(pCommitter->aDelData); taosArrayDestroy(pCommitter->aDelIdxN); - return code; - -_err: - tsdbError("vgId:%d, commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo) { int32_t code = 0; + int32_t lino = 0; if (suid) { if (pSkmInfo->suid == suid) { @@ -336,7 +335,7 @@ int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo pSkmInfo->uid = uid; tTSchemaDestroy(pSkmInfo->pTSchema); code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema); - if (code) goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); _exit: return code; @@ -344,6 +343,7 @@ _exit: static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) { int32_t code = 0; + int32_t lino = 0; if (pCommitter->skmRow.pTSchema) { if (pCommitter->skmRow.suid == suid) { @@ -359,9 +359,7 @@ static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid pCommitter->skmRow.uid = uid; tTSchemaDestroy(pCommitter->skmRow.pTSchema); code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema); - if (code) { - goto _exit; - } + TSDB_CHECK_CODE(code, lino, _exit); _exit: return code; @@ -369,6 +367,7 @@ _exit: static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) { int32_t code = 0; + int32_t lino = 0; ASSERT(pCommitter->dReader.pBlockIdx); @@ -378,7 +377,7 @@ static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) { (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx); code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); - if (code) goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); ASSERT(pCommitter->dReader.mBlock.nItem > 0); } else { @@ -391,6 +390,7 @@ _exit: static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { int32_t code = 0; + int32_t lino = 0; pCommitter->pIter = NULL; tRBTreeCreate(&pCommitter->rbt, tRowInfoCmprFn); @@ -431,14 +431,14 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { pIter->iStt = iStt; code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); if (taosArrayGetSize(pIter->aSttBlk) == 0) continue; pIter->iSttBlk = 0; SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, 0); code = tsdbReadSttBlock(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); pIter->iRow = 0; pIter->r.suid = pIter->bData.suid; @@ -460,16 +460,18 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { } code = tsdbNextCommitRow(pCommitter); - if (code) goto _err; - - return code; + TSDB_CHECK_CODE(code, lino, _exit); -_err: +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { int32_t code = 0; + int32_t lino = 0; STsdb *pTsdb = pCommitter->pTsdb; SDFileSet *pRSet = NULL; @@ -484,17 +486,17 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &tDFileSet, tDFileSetCmprFn, TD_EQ); if (pRSet) { code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // data code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); pCommitter->dReader.iBlockIdx = 0; if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) { pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0); code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } else { pCommitter->dReader.pBlockIdx = NULL; } @@ -531,7 +533,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { } wSet.aSttF[wSet.nSttF - 1] = &fStt; code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); taosArrayClear(pCommitter->dWriter.aBlockIdx); taosArrayClear(pCommitter->dWriter.aSttBlk); @@ -541,18 +543,18 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { // open iter code = tsdbOpenCommitIter(pCommitter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); _exit: - return code; - -_err: - tsdbError("vgId:%d, commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg) { int32_t code = 0; + int32_t lino = 0; if (pBlockData->nRow == 0) return code; @@ -586,24 +588,25 @@ int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapDa dataBlk.nSubBlock++; code = tsdbWriteBlockData(pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1], ((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL, cmprAlg, 0); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // put SDataBlk code = tMapDataPutItem(mDataBlk, &dataBlk, tPutDataBlk); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // clear tBlockDataClear(pBlockData); - return code; - -_err: - tsdbError("vgId:%d tsdb commit data block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg) { int32_t code = 0; + int32_t lino = 0; SSttBlk sstBlk; if (pBlockData->nRow == 0) return code; @@ -626,114 +629,117 @@ int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray // write code = tsdbWriteBlockData(pWriter, pBlockData, &sstBlk.bInfo, NULL, cmprAlg, 1); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // push SSttBlk if (taosArrayPush(aSttBlk, &sstBlk) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } // clear tBlockDataClear(pBlockData); - return code; - -_err: - tsdbError("vgId:%d tsdb commit last block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { int32_t code = 0; + int32_t lino = 0; // write aBlockIdx code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // write aSttBlk code = tsdbWriteSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSttBlk); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // update file header code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // upsert SDFileSet code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->dWriter.pWriter->wSet); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // close and sync code = tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 1); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); if (pCommitter->dReader.pReader) { code = tsdbDataFReaderClose(&pCommitter->dReader.pReader); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } _exit: - return code; - -_err: - tsdbError("vgId:%d, commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { int32_t code = 0; + int32_t lino = 0; while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) { SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx; code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbCommitterNextTableData(pCommitter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } - return code; - -_err: - tsdbError("vgId:%d tsdb move commit data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter); static int32_t tsdbCommitFileData(SCommitter *pCommitter) { int32_t code = 0; + int32_t lino = 0; STsdb *pTsdb = pCommitter->pTsdb; SMemTable *pMemTable = pTsdb->imem; // commit file data start code = tsdbCommitFileDataStart(pCommitter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // impl code = tsdbCommitFileDataImpl(pCommitter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // commit file data end code = tsdbCommitFileDataEnd(pCommitter); - if (code) goto _err; - - return code; + TSDB_CHECK_CODE(code, lino, _exit); -_err: - tsdbError("vgId:%d, commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - tsdbDataFReaderClose(&pCommitter->dReader.pReader); - tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbDataFReaderClose(&pCommitter->dReader.pReader); + tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0); + } return code; } // ---------------------------------------------------------------------------- static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { int32_t code = 0; + int32_t lino = 0; memset(pCommitter, 0, sizeof(*pCommitter)); ASSERT(pTsdb->mem && pTsdb->imem == NULL); @@ -754,30 +760,31 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem); if (pCommitter->aTbDataP == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbFSCopy(pTsdb, &pCommitter->fs); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); - return code; - -_err: - tsdbError("vgId:%d, tsdb start commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { int32_t code = 0; + int32_t lino = 0; // reader pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); if (pCommitter->dReader.aBlockIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } code = tBlockDataCreate(&pCommitter->dReader.bData); - if (code) goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); // merger for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) { @@ -785,33 +792,36 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); if (pIter->aSttBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } code = tBlockDataCreate(&pIter->bData); - if (code) goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } // writer pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); if (pCommitter->dWriter.aBlockIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } pCommitter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); if (pCommitter->dWriter.aSttBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } code = tBlockDataCreate(&pCommitter->dWriter.bData); - if (code) goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); code = tBlockDataCreate(&pCommitter->dWriter.bDatal); - if (code) goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); _exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } @@ -839,7 +849,9 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) { } static int32_t tsdbCommitData(SCommitter *pCommitter) { - int32_t code = 0; + int32_t code = 0; + int32_t lino = 0; + STsdb *pTsdb = pCommitter->pTsdb; SMemTable *pMemTable = pTsdb->imem; @@ -848,30 +860,29 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) { // start ==================== code = tsdbCommitDataStart(pCommitter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // impl ==================== pCommitter->nextKey = pMemTable->minKey; while (pCommitter->nextKey < TSKEY_MAX) { code = tsdbCommitFileData(pCommitter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } // end ==================== tsdbCommitDataEnd(pCommitter); _exit: - tsdbInfo("vgId:%d, commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow); - return code; - -_err: - tsdbCommitDataEnd(pCommitter); - tsdbError("vgId:%d, commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } static int32_t tsdbCommitDel(SCommitter *pCommitter) { - int32_t code = 0; + int32_t code = 0; + int32_t lino = 0; + STsdb *pTsdb = pCommitter->pTsdb; SMemTable *pMemTable = pTsdb->imem; @@ -882,7 +893,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { // start code = tsdbCommitDelStart(pCommitter); if (code) { - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } // impl @@ -918,7 +929,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { _commit_mem_del: code = tsdbCommitTableDel(pCommitter, pTbData, NULL); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); iTbData++; pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL; @@ -926,7 +937,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { _commit_disk_del: code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); iDelIdx++; pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL; @@ -934,7 +945,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { _commit_mem_and_disk_del: code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); iTbData++; pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL; @@ -945,28 +956,28 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { // end code = tsdbCommitDelEnd(pCommitter); - if (code) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _exit); _exit: - tsdbDebug("vgId:%d, commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel); - return code; - -_err: - tsdbError("vgId:%d, commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel); + } return code; } static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { - int32_t code = 0; + int32_t code = 0; + int32_t lino = 0; + STsdb *pTsdb = pCommitter->pTsdb; SMemTable *pMemTable = pTsdb->imem; ASSERT(eno == 0); code = tsdbFSCommit1(pTsdb, &pCommitter->fs); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // lock taosThreadRwlockWrlock(&pTsdb->rwLock); @@ -975,7 +986,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { code = tsdbFSCommit2(pTsdb, &pCommitter->fs); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } pTsdb->imem = NULL; @@ -987,16 +998,12 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { tsdbFSDestroy(&pCommitter->fs); taosArrayDestroy(pCommitter->aTbDataP); - // if (pCommitter->toMerge) { - // code = tsdbMerge(pTsdb); - // if (code) goto _err; - // } - - tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode)); - return code; - -_err: - tsdbError("vgId:%d, tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode)); + } return code; } @@ -1008,6 +1015,7 @@ static FORCE_INLINE SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) { static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { int32_t code = 0; + int32_t lino = 0; if (pCommitter->pIter) { SDataIter *pIter = pCommitter->pIter; @@ -1050,7 +1058,7 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk); code = tsdbReadSttBlock(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData); - if (code) goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); pIter->iRow = 0; pIter->r.suid = pIter->bData.suid; @@ -1085,11 +1093,16 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { } _exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) { - int32_t code = 0; + int32_t code = 0; + int32_t lino = 0; + SBlockData *pBlockData = &pCommitter->dWriter.bData; SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid}; @@ -1098,13 +1111,13 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) while (pRowInfo) { ASSERT(pRowInfo->row.type == 0); code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); code = tsdbNextCommitRow(pCommitter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); pRowInfo = tsdbGetCommitRow(pCommitter); if (pRowInfo) { @@ -1119,29 +1132,31 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) if (pBlockData->nRow >= pCommitter->maxRow) { code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } } code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); - return code; - -_err: - tsdbError("vgId:%d, tsdb commit ahead block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) { - int32_t code = 0; + int32_t code = 0; + int32_t lino = 0; + SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid}; SBlockData *pBDataR = &pCommitter->dReader.bData; SBlockData *pBDataW = &pCommitter->dWriter.bData; code = tsdbReadDataBlock(pCommitter->dReader.pReader, pDataBlk, pBDataR); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); tBlockDataClear(pBDataW); int32_t iRow = 0; @@ -1152,7 +1167,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row); if (c < 0) { code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); iRow++; if (iRow < pBDataR->nRow) { @@ -1163,13 +1178,13 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) } else if (c > 0) { ASSERT(pRowInfo->row.type == 0); code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); code = tBlockDataAppendRow(pBDataW, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); code = tsdbNextCommitRow(pCommitter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); pRowInfo = tsdbGetCommitRow(pCommitter); if (pRowInfo) { @@ -1186,13 +1201,13 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) if (pBDataW->nRow >= pCommitter->maxRow) { code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } } while (pRow) { code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); iRow++; if (iRow < pBDataR->nRow) { @@ -1203,22 +1218,24 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) if (pBDataW->nRow >= pCommitter->maxRow) { code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } } code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - if (code) goto _err; - - return code; + TSDB_CHECK_CODE(code, lino, _exit); -_err: - tsdbError("vgId:%d, tsdb commit merge block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { - int32_t code = 0; + int32_t code = 0; + int32_t lino = 0; + SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx; ASSERT(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0); @@ -1237,7 +1254,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { if (c < 0) { code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); iBlock++; if (iBlock < pCommitter->dReader.mBlock.nItem) { @@ -1247,13 +1264,13 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { } } else if (c > 0) { code = tsdbCommitAheadBlock(pCommitter, pDataBlk); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); pRowInfo = tsdbGetCommitRow(pCommitter); if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL; } else { code = tsdbCommitMergeBlock(pCommitter, pDataBlk); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); iBlock++; if (iBlock < pCommitter->dReader.mBlock.nItem) { @@ -1268,7 +1285,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { while (pDataBlk) { code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); iBlock++; if (iBlock < pCommitter->dReader.mBlock.nItem) { @@ -1279,25 +1296,25 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { } code = tsdbCommitterNextTableData(pCommitter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } _exit: - return code; - -_err: - tsdbError("vgId:%d tsdb merge table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) { int32_t code = 0; + int32_t lino = 0; SBlockData *pBDatal = &pCommitter->dWriter.bDatal; if (pBDatal->suid || pBDatal->uid) { if ((pBDatal->suid != id.suid) || (id.suid == 0)) { code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); - if (code) goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); tBlockDataReset(pBDatal); } } @@ -1306,42 +1323,48 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) { ASSERT(pCommitter->skmTable.suid == id.suid); ASSERT(pCommitter->skmTable.uid == id.uid); code = tBlockDataInit(pBDatal, id.suid, id.suid ? 0 : id.uid, pCommitter->skmTable.pTSchema); - if (code) goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } _exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { int32_t code = 0; + int32_t lino = 0; SBlockData *pBData = &pCommitter->dWriter.bData; SBlockData *pBDatal = &pCommitter->dWriter.bDatal; TABLEID id = {.suid = pBData->suid, .uid = pBData->uid}; code = tsdbInitLastBlockIfNeed(pCommitter, id); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) { TSDBROW row = tsdbRowFromBlockData(pBData, iRow); code = tBlockDataAppendRow(pBDatal, &row, NULL, pBData->uid); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); if (pBDatal->nRow >= pCommitter->maxRow) { code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } } - return code; - -_err: +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { int32_t code = 0; + int32_t lino = 0; SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { @@ -1354,7 +1377,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { if (pCommitter->toLastOnly) { pBData = &pCommitter->dWriter.bDatal; code = tsdbInitLastBlockIfNeed(pCommitter, id); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } else { pBData = &pCommitter->dWriter.bData; ASSERT(pBData->nRow == 0); @@ -1364,15 +1387,15 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { STSchema *pTSchema = NULL; if (pRowInfo->row.type == 0) { code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); pTSchema = pCommitter->skmRow.pTSchema; } code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); code = tsdbNextCommitRow(pCommitter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); pRowInfo = tsdbGetCommitRow(pCommitter); if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { @@ -1382,11 +1405,11 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { if (pBData->nRow >= pCommitter->maxRow) { if (pCommitter->toLastOnly) { code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } else { code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } } } @@ -1394,23 +1417,23 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { if (!pCommitter->toLastOnly && pBData->nRow) { if (pBData->nRow > pCommitter->minRow) { code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } else { code = tsdbAppendLastBlock(pCommitter); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } } _exit: - return code; - -_err: - tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { int32_t code = 0; + int32_t lino = 0; SRowInfo *pRowInfo; TABLEID id = {0}; @@ -1420,36 +1443,36 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { id.uid = pRowInfo->uid; code = tsdbMoveCommitData(pCommitter, id); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // start tMapDataReset(&pCommitter->dWriter.mBlock); // impl code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); code = tBlockDataInit(&pCommitter->dWriter.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); /* merge with data in .data file */ code = tsdbMergeTableData(pCommitter, id); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); /* handle remain table data */ code = tsdbCommitTableData(pCommitter, id); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // end if (pCommitter->dWriter.mBlock.nItem > 0) { SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid}; code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } } } @@ -1457,15 +1480,15 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { id.suid = INT64_MAX; id.uid = INT64_MAX; code = tsdbMoveCommitData(pCommitter, id); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); - if (code) goto _err; - - return code; + TSDB_CHECK_CODE(code, lino, _exit); -_err: - tsdbError("vgId:%d tsdb commit file data impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } -- GitLab