diff --git a/include/util/trbtree.h b/include/util/trbtree.h index f6d37e3d753de71fdf312b795935cb9014149f23..7c93c8dd8d65f2c4947019f370c0933e6ad81ee0 100644 --- a/include/util/trbtree.h +++ b/include/util/trbtree.h @@ -26,7 +26,7 @@ typedef struct SRBTree SRBTree; typedef struct SRBTreeNode SRBTreeNode; typedef struct SRBTreeIter SRBTreeIter; -typedef int32_t (*tRBTreeCmprFn)(const void *, const void *); +typedef int32_t (*tRBTreeCmprFn)(const SRBTreeNode *, const SRBTreeNode *); // SRBTree ============================================= #define tRBTreeMin(T) ((T)->min == ((T)->NIL) ? NULL : (T)->min) @@ -36,7 +36,7 @@ void tRBTreeCreate(SRBTree *pTree, tRBTreeCmprFn cmprFn); SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z); void tRBTreeDrop(SRBTree *pTree, SRBTreeNode *z); SRBTreeNode *tRBTreeDropByKey(SRBTree *pTree, void *pKey); -SRBTreeNode *tRBTreeGet(SRBTree *pTree, void *pKey); +SRBTreeNode *tRBTreeGet(SRBTree *pTree, const SRBTreeNode *pKeyNode); // SRBTreeIter ============================================= #define tRBTreeIterCreate(tree, ascend) \ @@ -53,8 +53,6 @@ struct SRBTreeNode { SRBTreeNode *right; }; -#define RBTREE_NODE_PAYLOAD(N) ((const void *)&(N)[1]) - struct SRBTree { tRBTreeCmprFn cmprFn; int64_t n; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 995d7a315550aa695b5f3bab816815edaf370800..aca9042261b5cb88ab091cab7465747f122a2fb9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -396,12 +396,19 @@ _exit: return code; } +static int32_t tDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) { + SDataIter *pIter1 = (SDataIter *)((uint8_t *)n1 - offsetof(SDataIter, n)); + SDataIter *pIter2 = (SDataIter *)((uint8_t *)n2 - offsetof(SDataIter, n)); + + return tRowInfoCmprFn(&pIter1->r, &pIter2->r); +} + static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { int32_t code = 0; int32_t lino = 0; pCommitter->pIter = NULL; - tRBTreeCreate(&pCommitter->rbt, tRowInfoCmprFn); + tRBTreeCreate(&pCommitter->rbt, tDataIterCmprFn); // memory TSDBKEY tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN}; diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 85514ed5b607814ef441ad239724e045002c44b8..da410532f9383be35d670f76063b57148a1f8b3a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -610,9 +610,6 @@ int32_t tsdbFSRollback(STsdbFS *pFS) { ASSERT(0); return code; - -_err: - return code; } int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile) { @@ -866,7 +863,7 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { nRef = atomic_sub_fetch_32(&fSet.pSmaF->nRef, 1); if (nRef == 0) { tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pSmaF, fname); - taosRemoveFile(fname); + (void)taosRemoveFile(fname); taosMemoryFree(fSet.pSmaF); } } else { @@ -877,7 +874,7 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { // stt if (sameDisk) { if (pSetNew->nSttF > pSetOld->nSttF) { - ASSERT(pSetNew->nSttF = pSetOld->nSttF + 1); + ASSERT(pSetNew->nSttF == pSetOld->nSttF + 1); pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); if (pSetOld->aSttF[pSetOld->nSttF] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1104,7 +1101,7 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) { ASSERT(nRef >= 0); if (nRef == 0) { tsdbDelFileName(pTsdb, pFS->pDelFile, fname); - taosRemoveFile(fname); + (void)taosRemoveFile(fname); taosMemoryFree(pFS->pDelFile); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index a6628463f8ff231505052dc3a48d8c7a59a6eaa5..450032d4f2eced2bb84c6d8106b8f13af8bb4bb8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -559,6 +559,8 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i // backward put first data row.pTSRow = tGetSubmitBlkNext(&blkIter); + if (row.pTSRow == NULL) return code; + key.ts = row.pTSRow->ts; nRow++; tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD); diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 0b7d1cbaf4b0d21799e1e6bea56168facda996ac..03d4a975f2ed82d2d9a3012784671bba5d0e1ff0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -504,9 +504,9 @@ _exit: SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; } // SMergeTree ================================================= -static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { - SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - sizeof(SRBTreeNode)); - SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - sizeof(SRBTreeNode)); +static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) { + SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node)); + SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - offsetof(SLDataIter, node)); TSDBKEY key1 = TSDBROW_KEY(&pIter1->rInfo.row); TSDBKEY key2 = TSDBROW_KEY(&pIter2->rInfo.row); @@ -583,7 +583,7 @@ bool tMergeTreeNext(SMergeTree *pMTree) { // compare with min in RB Tree pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt); if (pMTree->pIter && pIter) { - int32_t c = pMTree->rbt.cmprFn(RBTREE_NODE_PAYLOAD(&pMTree->pIter->node), RBTREE_NODE_PAYLOAD(&pIter->node)); + int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node); if (c > 0) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter); pMTree->pIter = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index e4080ccf1e36bd04032d0c16fe11b094c3b69d31..51978234901a6a4e26f78d0c7d867818d64fa7eb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -87,9 +87,13 @@ _err: int tsdbClose(STsdb **pTsdb) { if (*pTsdb) { - taosThreadRwlockDestroy(&(*pTsdb)->rwLock); + taosThreadRwlockWrlock(&(*pTsdb)->rwLock); tsdbMemTableDestroy((*pTsdb)->mem); (*pTsdb)->mem = NULL; + taosThreadRwlockUnlock(&(*pTsdb)->rwLock); + + taosThreadRwlockDestroy(&(*pTsdb)->rwLock); + tsdbFSClose(*pTsdb); tsdbCloseCache(*pTsdb); taosMemoryFreeClear(*pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 167b4a104c84d73acb4cde1bf192440645adbf31..911bd6b02c76fa3c5820b081cc1ec5357e572b43 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -18,7 +18,7 @@ // =============== PAGE-WISE FILE =============== static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) { int32_t code = 0; - STsdbFD *pFD; + STsdbFD *pFD = NULL; *ppFD = NULL; @@ -35,6 +35,7 @@ static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsd pFD->pFD = taosOpenFile(path, flag); if (pFD->pFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); + taosMemoryFree(pFD); goto _exit; } pFD->szPage = szPage; @@ -42,11 +43,15 @@ static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsd pFD->pBuf = taosMemoryCalloc(1, szPage); if (pFD->pBuf == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + taosCloseFile(&pFD->pFD); taosMemoryFree(pFD); goto _exit; } if (taosStatFile(path, &pFD->szFile, NULL) < 0) { code = TAOS_SYSTEM_ERROR(errno); + taosMemoryFree(pFD->pBuf); + taosCloseFile(&pFD->pFD); + taosMemoryFree(pFD); goto _exit; } ASSERT(pFD->szFile % szPage == 0); @@ -59,10 +64,12 @@ _exit: static void tsdbCloseFile(STsdbFD **ppFD) { STsdbFD *pFD = *ppFD; - taosMemoryFree(pFD->pBuf); - taosCloseFile(&pFD->pFD); - taosMemoryFree(pFD); - *ppFD = NULL; + if (pFD) { + taosMemoryFree(pFD->pBuf); + taosCloseFile(&pFD->pFD); + taosMemoryFree(pFD); + *ppFD = NULL; + } } static int32_t tsdbWriteFilePage(STsdbFD *pFD) { @@ -443,7 +450,7 @@ int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *p pBlockIdx->size = size; pHeadFile->size += size; - tsdbTrace("vgId:%d, write block, file ID:%d commit ID:%d suid:%" PRId64 " uid:%" PRId64 " offset:%" PRId64 + tsdbTrace("vgId:%d, write block, file ID:%d commit ID:%" PRId64 " suid:%" PRId64 " uid:%" PRId64 " offset:%" PRId64 " size:%" PRId64 " nItem:%d", TD_VID(pWriter->pTsdb->pVnode), pWriter->wSet.fid, pHeadFile->commitID, pBlockIdx->suid, pBlockIdx->uid, pBlockIdx->offset, pBlockIdx->size, mDataBlk->nItem); @@ -457,7 +464,7 @@ _err: int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk) { int32_t code = 0; SSttFile *pSttFile = &pWriter->fStt[pWriter->wSet.nSttF - 1]; - int64_t size; + int64_t size = 0; int64_t n; // check @@ -906,10 +913,6 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { taosMemoryFree(*ppReader); *ppReader = NULL; return code; - -_err: - tsdbError("vgId:%d, data file reader close failed since %s", TD_VID((*ppReader)->pTsdb->pVnode), tstrerror(code)); - return code; } int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) { @@ -1289,16 +1292,17 @@ _exit: // SDelFWriter ==================================================== int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb) { int32_t code = 0; + int32_t lino = 0; char fname[TSDB_FILENAME_LEN]; uint8_t hdr[TSDB_FHDR_SIZE] = {0}; - SDelFWriter *pDelFWriter; + SDelFWriter *pDelFWriter = NULL; int64_t n; // alloc pDelFWriter = (SDelFWriter *)taosMemoryCalloc(1, sizeof(*pDelFWriter)); if (pDelFWriter == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } pDelFWriter->pTsdb = pTsdb; pDelFWriter->fDel = *pFile; @@ -1306,21 +1310,28 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb tsdbDelFileName(pTsdb, pFile, fname); code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, &pDelFWriter->pWriteH); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // update header code = tsdbWriteFile(pDelFWriter->pWriteH, 0, hdr, TSDB_FHDR_SIZE); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); pDelFWriter->fDel.size = TSDB_FHDR_SIZE; pDelFWriter->fDel.offset = 0; *ppWriter = pDelFWriter; - return code; -_err: - tsdbError("vgId:%d, failed to open del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - *ppWriter = NULL; +_exit: + if (code) { + if (pDelFWriter) { + taosMemoryFree(pDelFWriter); + tsdbCloseFile(&pDelFWriter->pWriteH); + } + *ppWriter = NULL; + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(errno)); + } else { + *ppWriter = pDelFWriter; + } return code; } @@ -1456,15 +1467,15 @@ struct SDelFReader { int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) { int32_t code = 0; + int32_t lino = 0; char fname[TSDB_FILENAME_LEN]; - SDelFReader *pDelFReader; - int64_t n; + SDelFReader *pDelFReader = NULL; // alloc pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader)); if (pDelFReader == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + goto _exit; } // open impl @@ -1473,14 +1484,18 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb tsdbDelFileName(pTsdb, pFile, fname); code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ, &pDelFReader->pReadH); - if (code) goto _err; - - *ppReader = pDelFReader; - return code; + if (code) { + taosMemoryFree(pDelFReader); + goto _exit; + } -_err: - tsdbError("vgId:%d, del file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - *ppReader = NULL; +_exit: + if (code) { + *ppReader = NULL; + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + *ppReader = pDelFReader; + } return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index a928dc3484c67461a8b5a49c39d60e3af4336e8b..423ea2576e7ea96235e6f819aa8c2ec0cb2b492e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -67,6 +67,13 @@ extern int32_t tRowInfoCmprFn(const void* p1, const void* p2); extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData); extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo); +static int32_t tFDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) { + SFDataIter* pIter1 = (SFDataIter*)(((uint8_t*)pNode1) - offsetof(SFDataIter, n)); + SFDataIter* pIter2 = (SFDataIter*)(((uint8_t*)pNode2) - offsetof(SFDataIter, n)); + + return tRowInfoCmprFn(&pIter1->rInfo, &pIter2->rInfo); +} + static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) { int32_t code = 0; @@ -79,7 +86,7 @@ static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) { if (code) goto _err; pReader->pIter = NULL; - tRBTreeCreate(&pReader->rbt, tRowInfoCmprFn); + tRBTreeCreate(&pReader->rbt, tFDataIterCmprFn); // .data file SFDataIter* pIter = &pReader->aFDataIter[0]; @@ -421,7 +428,7 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { n += tPutDelData((*ppData) + n, pDelData); } - tsdbInfo("vgId:%d, vnode snapshot tsdb read del data for %s, suid:%" PRId64 " uid:%d" PRId64 " size:%d", + tsdbInfo("vgId:%d, vnode snapshot tsdb read del data for %s, suid:%" PRId64 " uid:%" PRId64 " size:%d", TD_VID(pTsdb->pVnode), pTsdb->path, pDelIdx->suid, pDelIdx->uid, size); break; @@ -431,7 +438,7 @@ _exit: return code; _err: - tsdbError("vgId:%d, vnode snapshot tsdb read del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->pVnode, + tsdbError("vgId:%d, vnode snapshot tsdb read del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, tstrerror(code)); return code; } @@ -1247,20 +1254,21 @@ _err: // APIs int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) { int32_t code = 0; + int32_t lino = 0; STsdbSnapWriter* pWriter = NULL; // alloc pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); if (pWriter == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } pWriter->pTsdb = pTsdb; pWriter->sver = sver; pWriter->ever = ever; code = tsdbFSCopy(pTsdb, &pWriter->fs); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // config pWriter->minutes = pTsdb->keepCfg.days; @@ -1272,7 +1280,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr // SNAP_DATA_TSDB code = tBlockDataCreate(&pWriter->bData); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); pWriter->fid = INT32_MIN; pWriter->id = (TABLEID){0}; @@ -1280,53 +1288,67 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr pWriter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); if (pWriter->dReader.aBlockIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } code = tBlockDataCreate(&pWriter->dReader.bData); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // Writer pWriter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); if (pWriter->dWriter.aBlockIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } pWriter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); if (pWriter->dWriter.aSttBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } code = tBlockDataCreate(&pWriter->dWriter.bData); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); code = tBlockDataCreate(&pWriter->dWriter.sData); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); // SNAP_DATA_DEL pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx)); if (pWriter->aDelIdxR == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } pWriter->aDelData = taosArrayInit(0, sizeof(SDelData)); if (pWriter->aDelData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } pWriter->aDelIdxW = taosArrayInit(0, sizeof(SDelIdx)); if (pWriter->aDelIdxW == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } - *ppWriter = pWriter; - - tsdbInfo("vgId:%d, tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path); - return code; - -_err: - tsdbError("vgId:%d, tsdb snapshot writer open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, - tstrerror(code)); - *ppWriter = NULL; +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + *ppWriter = NULL; + + if (pWriter) { + if (pWriter->aDelIdxW) taosArrayDestroy(pWriter->aDelIdxW); + if (pWriter->aDelData) taosArrayDestroy(pWriter->aDelData); + if (pWriter->aDelIdxR) taosArrayDestroy(pWriter->aDelIdxR); + tBlockDataDestroy(&pWriter->dWriter.sData, 1); + tBlockDataDestroy(&pWriter->dWriter.bData, 1); + if (pWriter->dWriter.aSttBlk) taosArrayDestroy(pWriter->dWriter.aSttBlk); + if (pWriter->dWriter.aBlockIdx) taosArrayDestroy(pWriter->dWriter.aBlockIdx); + tBlockDataDestroy(&pWriter->dReader.bData, 1); + if (pWriter->dReader.aBlockIdx) taosArrayDestroy(pWriter->dReader.aBlockIdx); + tBlockDataDestroy(&pWriter->bData, 1); + tsdbFSDestroy(&pWriter->fs); + taosMemoryFree(pWriter); + } + } else { + tsdbDebug("vgId:%d, tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path); + *ppWriter = pWriter; + } return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 0a9fbf92a4bf62326aa9755b827b83d0d510d2f7..f38802aee78021c158a29fed44e0fc14e06fc482 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -34,7 +34,9 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp * } // loop to insert - tInitSubmitMsgIter(pMsg, &msgIter); + if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) { + return -1; + } while (true) { SSubmitBlkRsp r = {0}; tGetSubmitMsgNext(&msgIter, &pBlock); diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 3c0a041197d372f2540e11e8edb162b15c39c2e7..730bd264a7be40551910f9f08d4e6ed0ed994954 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -141,17 +141,17 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { } void vnodeBufPoolFree(SVBufPool *pPool, void *p) { - uint8_t *ptr = (uint8_t *)p; - SVBufPoolNode *pNode; + // uint8_t *ptr = (uint8_t *)p; + // SVBufPoolNode *pNode; - if (ptr < pPool->node.data || ptr >= pPool->node.data + pPool->node.size) { - pNode = &((SVBufPoolNode *)p)[-1]; - *pNode->pnext = pNode->prev; - pNode->prev->pnext = pNode->pnext; + // if (ptr < pPool->node.data || ptr >= pPool->node.data + pPool->node.size) { + // pNode = &((SVBufPoolNode *)p)[-1]; + // *pNode->pnext = pNode->prev; + // pNode->prev->pnext = pNode->pnext; - pPool->size = pPool->size - sizeof(*pNode) - pNode->size; - taosMemoryFree(pNode); - } + // pPool->size = pPool->size - sizeof(*pNode) - pNode->size; + // taosMemoryFree(pNode); + // } } void vnodeBufPoolRef(SVBufPool *pPool) { diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index 314c0e7390a6616e8bceba613cf4a57687db8fef..9fe37505add7d2ec768233637f9da2c55f7124fb 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -46,11 +46,17 @@ int vnodeInit(int nthreads) { return 0; } - vnodeGlobal.stop = 0; + taosThreadMutexInit(&vnodeGlobal.mutex, NULL); + taosThreadCondInit(&vnodeGlobal.hasTask, NULL); + taosThreadMutexLock(&vnodeGlobal.mutex); + + vnodeGlobal.stop = 0; vnodeGlobal.queue.next = &vnodeGlobal.queue; vnodeGlobal.queue.prev = &vnodeGlobal.queue; + taosThreadMutexUnlock(&(vnodeGlobal.mutex)); + vnodeGlobal.nthreads = nthreads; vnodeGlobal.threads = taosMemoryCalloc(nthreads, sizeof(TdThread)); if (vnodeGlobal.threads == NULL) { @@ -59,9 +65,6 @@ int vnodeInit(int nthreads) { return -1; } - taosThreadMutexInit(&vnodeGlobal.mutex, NULL); - taosThreadCondInit(&vnodeGlobal.hasTask, NULL); - for (int i = 0; i < nthreads; i++) { taosThreadCreate(&(vnodeGlobal.threads[i]), NULL, loop, NULL); } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index de3af7cde6933d1bce189f3b3c8ba81b52293056..298653d3edfa519f565f32adb642fca1c364d303 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -38,7 +38,7 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { if (taosMkDir(path)) { return TAOS_SYSTEM_ERROR(errno); } - strcpy(dir, path); + snprintf(dir, TSDB_FILENAME_LEN, "%s", path); } if (pCfg) { @@ -51,7 +51,7 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { info.state.commitID = 0; if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) { - vError("vgId:%d, failed to save vnode config since %s", pCfg->vgId, tstrerror(terrno)); + vError("vgId:%d, failed to save vnode config since %s", pCfg ? pCfg->vgId : 0, tstrerror(terrno)); return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 08c3a34699b6f9f83b366e42f77e42a6094f09ed..9cbb1b8af491ea1567c8d9a3ada47614749503bc 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -166,7 +166,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) if (*ppData) { goto _exit; } else { - pReader->tqHandleDone = 1; + pReader->tqOffsetDone = 1; code = tqOffsetReaderClose(&pReader->pTqOffsetReader); if (code) goto _err; } @@ -219,7 +219,7 @@ _exit: return code; _err: - vError("vgId:% vnode snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code)); + vError("vgId:%d vnode snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code)); return code; } @@ -260,7 +260,10 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr // commit it code = vnodeCommit(pVnode); - if (code) goto _err; + if (code) { + taosMemoryFree(pWriter); + goto _err; + } // inc commit ID pVnode->state.commitID++; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 131327976f06d7a610a2319b1f4a40e2c097c51f..b83f790a3d44625f6361156eb30037131580339b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -68,7 +68,10 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { int64_t ctime = taosGetTimestampMs(); tb_uid_t uid; - tInitSubmitMsgIter(pSubmitReq, &msgIter); + if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) { + code = terrno; + goto _err; + } for (;;) { tGetSubmitMsgNext(&msgIter, &pBlock); diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 04711eb6a0af802f61cc39da3fba1282dcaf0c83..58be8642d7db5429d5776a3e9fb7f2cfed8e1dd8 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -34,9 +34,9 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage); static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage); -static FORCE_INLINE int32_t pageCmpFn(const void *lhs, const void *rhs) { - SPage *pPageL = (SPage *)(((uint8_t *)lhs) - sizeof(SRBTreeNode)); - SPage *pPageR = (SPage *)(((uint8_t *)rhs) - sizeof(SRBTreeNode)); +static FORCE_INLINE int32_t pageCmpFn(const SRBTreeNode *lhs, const SRBTreeNode *rhs) { + SPage *pPageL = (SPage *)(((uint8_t *)lhs) - offsetof(SPage, node)); + SPage *pPageR = (SPage *)(((uint8_t *)rhs) - offsetof(SPage, node)); SPgno pgnoL = TDB_PAGE_PGNO(pPageL); SPgno pgnoR = TDB_PAGE_PGNO(pPageR); diff --git a/source/util/src/trbtree.c b/source/util/src/trbtree.c index 65f1bac60aa51c48fe65895dbe0ded241d22590d..cbbf53d8f1c18e4770c513bc678fc68646268ff7 100644 --- a/source/util/src/trbtree.c +++ b/source/util/src/trbtree.c @@ -219,7 +219,7 @@ SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z) { while (temp != pTree->NIL) { y = temp; - int32_t c = pTree->cmprFn(RBTREE_NODE_PAYLOAD(z), RBTREE_NODE_PAYLOAD(temp)); + int32_t c = pTree->cmprFn(z, temp); if (c < 0) { temp = temp->left; } else if (c > 0) { @@ -232,7 +232,7 @@ SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z) { if (y == pTree->NIL) { pTree->root = z; - } else if (pTree->cmprFn(RBTREE_NODE_PAYLOAD(z), RBTREE_NODE_PAYLOAD(y)) < 0) { + } else if (pTree->cmprFn(z, y) < 0) { y->left = z; } else { y->right = z; @@ -245,10 +245,10 @@ SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z) { tRBTreePutFix(pTree, z); // update min/max node - if (pTree->min == pTree->NIL || pTree->cmprFn(RBTREE_NODE_PAYLOAD(pTree->min), RBTREE_NODE_PAYLOAD(z)) > 0) { + if (pTree->min == pTree->NIL || pTree->cmprFn(pTree->min, z) > 0) { pTree->min = z; } - if (pTree->max == pTree->NIL || pTree->cmprFn(RBTREE_NODE_PAYLOAD(pTree->max), RBTREE_NODE_PAYLOAD(z)) < 0) { + if (pTree->max == pTree->NIL || pTree->cmprFn(pTree->max, z) < 0) { pTree->max = z; } pTree->n++; @@ -309,11 +309,11 @@ SRBTreeNode *tRBTreeDropByKey(SRBTree *pTree, void *pKey) { return pNode; } -SRBTreeNode *tRBTreeGet(SRBTree *pTree, void *pKey) { +SRBTreeNode *tRBTreeGet(SRBTree *pTree, const SRBTreeNode *pKeyNode) { SRBTreeNode *pNode = pTree->root; while (pNode != pTree->NIL) { - int32_t c = pTree->cmprFn(pKey, RBTREE_NODE_PAYLOAD(pNode)); + int32_t c = pTree->cmprFn(pKeyNode, pNode); if (c < 0) { pNode = pNode->left; diff --git a/source/util/src/tskiplist.c b/source/util/src/tskiplist.c index 4ce668e6bab22abc74f509114ab84b0693469634..d93e9fc5695904995041307b1ec333f780ff2b6f 100644 --- a/source/util/src/tskiplist.c +++ b/source/util/src/tskiplist.c @@ -145,7 +145,10 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it tSkipListWLock(pSkipList); void *pData = iterate(iter); - if (pData == NULL) return; + if (pData == NULL) { + tSkipListUnlock(pSkipList); + return; + } // backward to put the first data hasDup = tSkipListGetPosToPut(pSkipList, backward, pData);