未验证 提交 e20dbd56 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #17352 from taosdata/fix/hzcheng_coverity_scan

fix: converity scan problem
...@@ -26,7 +26,7 @@ typedef struct SRBTree SRBTree; ...@@ -26,7 +26,7 @@ typedef struct SRBTree SRBTree;
typedef struct SRBTreeNode SRBTreeNode; typedef struct SRBTreeNode SRBTreeNode;
typedef struct SRBTreeIter SRBTreeIter; typedef struct SRBTreeIter SRBTreeIter;
typedef int32_t (*tRBTreeCmprFn)(const void *, const void *); typedef int32_t (*tRBTreeCmprFn)(const SRBTreeNode *, const SRBTreeNode *);
// SRBTree ============================================= // SRBTree =============================================
#define tRBTreeMin(T) ((T)->min == ((T)->NIL) ? NULL : (T)->min) #define tRBTreeMin(T) ((T)->min == ((T)->NIL) ? NULL : (T)->min)
...@@ -36,7 +36,7 @@ void tRBTreeCreate(SRBTree *pTree, tRBTreeCmprFn cmprFn); ...@@ -36,7 +36,7 @@ void tRBTreeCreate(SRBTree *pTree, tRBTreeCmprFn cmprFn);
SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z); SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z);
void tRBTreeDrop(SRBTree *pTree, SRBTreeNode *z); void tRBTreeDrop(SRBTree *pTree, SRBTreeNode *z);
SRBTreeNode *tRBTreeDropByKey(SRBTree *pTree, void *pKey); SRBTreeNode *tRBTreeDropByKey(SRBTree *pTree, void *pKey);
SRBTreeNode *tRBTreeGet(SRBTree *pTree, void *pKey); SRBTreeNode *tRBTreeGet(SRBTree *pTree, const SRBTreeNode *pKeyNode);
// SRBTreeIter ============================================= // SRBTreeIter =============================================
#define tRBTreeIterCreate(tree, ascend) \ #define tRBTreeIterCreate(tree, ascend) \
...@@ -53,8 +53,6 @@ struct SRBTreeNode { ...@@ -53,8 +53,6 @@ struct SRBTreeNode {
SRBTreeNode *right; SRBTreeNode *right;
}; };
#define RBTREE_NODE_PAYLOAD(N) ((const void *)&(N)[1])
struct SRBTree { struct SRBTree {
tRBTreeCmprFn cmprFn; tRBTreeCmprFn cmprFn;
int64_t n; int64_t n;
......
...@@ -396,12 +396,19 @@ _exit: ...@@ -396,12 +396,19 @@ _exit:
return code; return code;
} }
static int32_t tDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
SDataIter *pIter1 = (SDataIter *)((uint8_t *)n1 - offsetof(SDataIter, n));
SDataIter *pIter2 = (SDataIter *)((uint8_t *)n2 - offsetof(SDataIter, n));
return tRowInfoCmprFn(&pIter1->r, &pIter2->r);
}
static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
pCommitter->pIter = NULL; pCommitter->pIter = NULL;
tRBTreeCreate(&pCommitter->rbt, tRowInfoCmprFn); tRBTreeCreate(&pCommitter->rbt, tDataIterCmprFn);
// memory // memory
TSDBKEY tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN}; TSDBKEY tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN};
......
...@@ -610,9 +610,6 @@ int32_t tsdbFSRollback(STsdbFS *pFS) { ...@@ -610,9 +610,6 @@ int32_t tsdbFSRollback(STsdbFS *pFS) {
ASSERT(0); ASSERT(0);
return code; return code;
_err:
return code;
} }
int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile) { int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile) {
...@@ -866,7 +863,7 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { ...@@ -866,7 +863,7 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
nRef = atomic_sub_fetch_32(&fSet.pSmaF->nRef, 1); nRef = atomic_sub_fetch_32(&fSet.pSmaF->nRef, 1);
if (nRef == 0) { if (nRef == 0) {
tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pSmaF, fname); tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pSmaF, fname);
taosRemoveFile(fname); (void)taosRemoveFile(fname);
taosMemoryFree(fSet.pSmaF); taosMemoryFree(fSet.pSmaF);
} }
} else { } else {
...@@ -877,7 +874,7 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { ...@@ -877,7 +874,7 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
// stt // stt
if (sameDisk) { if (sameDisk) {
if (pSetNew->nSttF > pSetOld->nSttF) { if (pSetNew->nSttF > pSetOld->nSttF) {
ASSERT(pSetNew->nSttF = pSetOld->nSttF + 1); ASSERT(pSetNew->nSttF == pSetOld->nSttF + 1);
pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
if (pSetOld->aSttF[pSetOld->nSttF] == NULL) { if (pSetOld->aSttF[pSetOld->nSttF] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -1104,7 +1101,7 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) { ...@@ -1104,7 +1101,7 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) {
ASSERT(nRef >= 0); ASSERT(nRef >= 0);
if (nRef == 0) { if (nRef == 0) {
tsdbDelFileName(pTsdb, pFS->pDelFile, fname); tsdbDelFileName(pTsdb, pFS->pDelFile, fname);
taosRemoveFile(fname); (void)taosRemoveFile(fname);
taosMemoryFree(pFS->pDelFile); taosMemoryFree(pFS->pDelFile);
} }
} }
......
...@@ -559,6 +559,8 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i ...@@ -559,6 +559,8 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
// backward put first data // backward put first data
row.pTSRow = tGetSubmitBlkNext(&blkIter); row.pTSRow = tGetSubmitBlkNext(&blkIter);
if (row.pTSRow == NULL) return code;
key.ts = row.pTSRow->ts; key.ts = row.pTSRow->ts;
nRow++; nRow++;
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD); tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
......
...@@ -504,9 +504,9 @@ _exit: ...@@ -504,9 +504,9 @@ _exit:
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; } SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
// SMergeTree ================================================= // SMergeTree =================================================
static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTreeNode *p2) {
SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - sizeof(SRBTreeNode)); SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node));
SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - sizeof(SRBTreeNode)); SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - offsetof(SLDataIter, node));
TSDBKEY key1 = TSDBROW_KEY(&pIter1->rInfo.row); TSDBKEY key1 = TSDBROW_KEY(&pIter1->rInfo.row);
TSDBKEY key2 = TSDBROW_KEY(&pIter2->rInfo.row); TSDBKEY key2 = TSDBROW_KEY(&pIter2->rInfo.row);
...@@ -583,7 +583,7 @@ bool tMergeTreeNext(SMergeTree *pMTree) { ...@@ -583,7 +583,7 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
// compare with min in RB Tree // compare with min in RB Tree
pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt); pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
if (pMTree->pIter && pIter) { if (pMTree->pIter && pIter) {
int32_t c = pMTree->rbt.cmprFn(RBTREE_NODE_PAYLOAD(&pMTree->pIter->node), RBTREE_NODE_PAYLOAD(&pIter->node)); int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node);
if (c > 0) { if (c > 0) {
tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter); tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
pMTree->pIter = NULL; pMTree->pIter = NULL;
......
...@@ -87,9 +87,13 @@ _err: ...@@ -87,9 +87,13 @@ _err:
int tsdbClose(STsdb **pTsdb) { int tsdbClose(STsdb **pTsdb) {
if (*pTsdb) { if (*pTsdb) {
taosThreadRwlockDestroy(&(*pTsdb)->rwLock); taosThreadRwlockWrlock(&(*pTsdb)->rwLock);
tsdbMemTableDestroy((*pTsdb)->mem); tsdbMemTableDestroy((*pTsdb)->mem);
(*pTsdb)->mem = NULL; (*pTsdb)->mem = NULL;
taosThreadRwlockUnlock(&(*pTsdb)->rwLock);
taosThreadRwlockDestroy(&(*pTsdb)->rwLock);
tsdbFSClose(*pTsdb); tsdbFSClose(*pTsdb);
tsdbCloseCache(*pTsdb); tsdbCloseCache(*pTsdb);
taosMemoryFreeClear(*pTsdb); taosMemoryFreeClear(*pTsdb);
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
// =============== PAGE-WISE FILE =============== // =============== PAGE-WISE FILE ===============
static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) { static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) {
int32_t code = 0; int32_t code = 0;
STsdbFD *pFD; STsdbFD *pFD = NULL;
*ppFD = NULL; *ppFD = NULL;
...@@ -35,6 +35,7 @@ static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsd ...@@ -35,6 +35,7 @@ static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsd
pFD->pFD = taosOpenFile(path, flag); pFD->pFD = taosOpenFile(path, flag);
if (pFD->pFD == NULL) { if (pFD->pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(pFD);
goto _exit; goto _exit;
} }
pFD->szPage = szPage; pFD->szPage = szPage;
...@@ -42,11 +43,15 @@ static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsd ...@@ -42,11 +43,15 @@ static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsd
pFD->pBuf = taosMemoryCalloc(1, szPage); pFD->pBuf = taosMemoryCalloc(1, szPage);
if (pFD->pBuf == NULL) { if (pFD->pBuf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
taosCloseFile(&pFD->pFD);
taosMemoryFree(pFD); taosMemoryFree(pFD);
goto _exit; goto _exit;
} }
if (taosStatFile(path, &pFD->szFile, NULL) < 0) { if (taosStatFile(path, &pFD->szFile, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(pFD->pBuf);
taosCloseFile(&pFD->pFD);
taosMemoryFree(pFD);
goto _exit; goto _exit;
} }
ASSERT(pFD->szFile % szPage == 0); ASSERT(pFD->szFile % szPage == 0);
...@@ -59,10 +64,12 @@ _exit: ...@@ -59,10 +64,12 @@ _exit:
static void tsdbCloseFile(STsdbFD **ppFD) { static void tsdbCloseFile(STsdbFD **ppFD) {
STsdbFD *pFD = *ppFD; STsdbFD *pFD = *ppFD;
taosMemoryFree(pFD->pBuf); if (pFD) {
taosCloseFile(&pFD->pFD); taosMemoryFree(pFD->pBuf);
taosMemoryFree(pFD); taosCloseFile(&pFD->pFD);
*ppFD = NULL; taosMemoryFree(pFD);
*ppFD = NULL;
}
} }
static int32_t tsdbWriteFilePage(STsdbFD *pFD) { static int32_t tsdbWriteFilePage(STsdbFD *pFD) {
...@@ -443,7 +450,7 @@ int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *p ...@@ -443,7 +450,7 @@ int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *p
pBlockIdx->size = size; pBlockIdx->size = size;
pHeadFile->size += size; pHeadFile->size += size;
tsdbTrace("vgId:%d, write block, file ID:%d commit ID:%d suid:%" PRId64 " uid:%" PRId64 " offset:%" PRId64 tsdbTrace("vgId:%d, write block, file ID:%d commit ID:%" PRId64 " suid:%" PRId64 " uid:%" PRId64 " offset:%" PRId64
" size:%" PRId64 " nItem:%d", " size:%" PRId64 " nItem:%d",
TD_VID(pWriter->pTsdb->pVnode), pWriter->wSet.fid, pHeadFile->commitID, pBlockIdx->suid, pBlockIdx->uid, TD_VID(pWriter->pTsdb->pVnode), pWriter->wSet.fid, pHeadFile->commitID, pBlockIdx->suid, pBlockIdx->uid,
pBlockIdx->offset, pBlockIdx->size, mDataBlk->nItem); pBlockIdx->offset, pBlockIdx->size, mDataBlk->nItem);
...@@ -457,7 +464,7 @@ _err: ...@@ -457,7 +464,7 @@ _err:
int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk) { int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk) {
int32_t code = 0; int32_t code = 0;
SSttFile *pSttFile = &pWriter->fStt[pWriter->wSet.nSttF - 1]; SSttFile *pSttFile = &pWriter->fStt[pWriter->wSet.nSttF - 1];
int64_t size; int64_t size = 0;
int64_t n; int64_t n;
// check // check
...@@ -906,10 +913,6 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { ...@@ -906,10 +913,6 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
taosMemoryFree(*ppReader); taosMemoryFree(*ppReader);
*ppReader = NULL; *ppReader = NULL;
return code; return code;
_err:
tsdbError("vgId:%d, data file reader close failed since %s", TD_VID((*ppReader)->pTsdb->pVnode), tstrerror(code));
return code;
} }
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) { int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
...@@ -1289,16 +1292,17 @@ _exit: ...@@ -1289,16 +1292,17 @@ _exit:
// SDelFWriter ==================================================== // SDelFWriter ====================================================
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb) { int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
char fname[TSDB_FILENAME_LEN]; char fname[TSDB_FILENAME_LEN];
uint8_t hdr[TSDB_FHDR_SIZE] = {0}; uint8_t hdr[TSDB_FHDR_SIZE] = {0};
SDelFWriter *pDelFWriter; SDelFWriter *pDelFWriter = NULL;
int64_t n; int64_t n;
// alloc // alloc
pDelFWriter = (SDelFWriter *)taosMemoryCalloc(1, sizeof(*pDelFWriter)); pDelFWriter = (SDelFWriter *)taosMemoryCalloc(1, sizeof(*pDelFWriter));
if (pDelFWriter == NULL) { if (pDelFWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
pDelFWriter->pTsdb = pTsdb; pDelFWriter->pTsdb = pTsdb;
pDelFWriter->fDel = *pFile; pDelFWriter->fDel = *pFile;
...@@ -1306,21 +1310,28 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb ...@@ -1306,21 +1310,28 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb
tsdbDelFileName(pTsdb, pFile, fname); tsdbDelFileName(pTsdb, pFile, fname);
code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE,
&pDelFWriter->pWriteH); &pDelFWriter->pWriteH);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// update header // update header
code = tsdbWriteFile(pDelFWriter->pWriteH, 0, hdr, TSDB_FHDR_SIZE); code = tsdbWriteFile(pDelFWriter->pWriteH, 0, hdr, TSDB_FHDR_SIZE);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
pDelFWriter->fDel.size = TSDB_FHDR_SIZE; pDelFWriter->fDel.size = TSDB_FHDR_SIZE;
pDelFWriter->fDel.offset = 0; pDelFWriter->fDel.offset = 0;
*ppWriter = pDelFWriter; *ppWriter = pDelFWriter;
return code;
_err: _exit:
tsdbError("vgId:%d, failed to open del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); if (code) {
*ppWriter = NULL; if (pDelFWriter) {
taosMemoryFree(pDelFWriter);
tsdbCloseFile(&pDelFWriter->pWriteH);
}
*ppWriter = NULL;
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(errno));
} else {
*ppWriter = pDelFWriter;
}
return code; return code;
} }
...@@ -1456,15 +1467,15 @@ struct SDelFReader { ...@@ -1456,15 +1467,15 @@ struct SDelFReader {
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) { int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
char fname[TSDB_FILENAME_LEN]; char fname[TSDB_FILENAME_LEN];
SDelFReader *pDelFReader; SDelFReader *pDelFReader = NULL;
int64_t n;
// alloc // alloc
pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader)); pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader));
if (pDelFReader == NULL) { if (pDelFReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _exit;
} }
// open impl // open impl
...@@ -1473,14 +1484,18 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb ...@@ -1473,14 +1484,18 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
tsdbDelFileName(pTsdb, pFile, fname); tsdbDelFileName(pTsdb, pFile, fname);
code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ, &pDelFReader->pReadH); code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ, &pDelFReader->pReadH);
if (code) goto _err; if (code) {
taosMemoryFree(pDelFReader);
*ppReader = pDelFReader; goto _exit;
return code; }
_err: _exit:
tsdbError("vgId:%d, del file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); if (code) {
*ppReader = NULL; *ppReader = NULL;
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
*ppReader = pDelFReader;
}
return code; return code;
} }
......
...@@ -67,6 +67,13 @@ extern int32_t tRowInfoCmprFn(const void* p1, const void* p2); ...@@ -67,6 +67,13 @@ extern int32_t tRowInfoCmprFn(const void* p1, const void* p2);
extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData); extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData);
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo); extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
static int32_t tFDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) {
SFDataIter* pIter1 = (SFDataIter*)(((uint8_t*)pNode1) - offsetof(SFDataIter, n));
SFDataIter* pIter2 = (SFDataIter*)(((uint8_t*)pNode2) - offsetof(SFDataIter, n));
return tRowInfoCmprFn(&pIter1->rInfo, &pIter2->rInfo);
}
static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) { static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
int32_t code = 0; int32_t code = 0;
...@@ -79,7 +86,7 @@ static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) { ...@@ -79,7 +86,7 @@ static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
if (code) goto _err; if (code) goto _err;
pReader->pIter = NULL; pReader->pIter = NULL;
tRBTreeCreate(&pReader->rbt, tRowInfoCmprFn); tRBTreeCreate(&pReader->rbt, tFDataIterCmprFn);
// .data file // .data file
SFDataIter* pIter = &pReader->aFDataIter[0]; SFDataIter* pIter = &pReader->aFDataIter[0];
...@@ -421,7 +428,7 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { ...@@ -421,7 +428,7 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
n += tPutDelData((*ppData) + n, pDelData); n += tPutDelData((*ppData) + n, pDelData);
} }
tsdbInfo("vgId:%d, vnode snapshot tsdb read del data for %s, suid:%" PRId64 " uid:%d" PRId64 " size:%d", tsdbInfo("vgId:%d, vnode snapshot tsdb read del data for %s, suid:%" PRId64 " uid:%" PRId64 " size:%d",
TD_VID(pTsdb->pVnode), pTsdb->path, pDelIdx->suid, pDelIdx->uid, size); TD_VID(pTsdb->pVnode), pTsdb->path, pDelIdx->suid, pDelIdx->uid, size);
break; break;
...@@ -431,7 +438,7 @@ _exit: ...@@ -431,7 +438,7 @@ _exit:
return code; return code;
_err: _err:
tsdbError("vgId:%d, vnode snapshot tsdb read del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->pVnode, tsdbError("vgId:%d, vnode snapshot tsdb read del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
tstrerror(code)); tstrerror(code));
return code; return code;
} }
...@@ -1247,20 +1254,21 @@ _err: ...@@ -1247,20 +1254,21 @@ _err:
// APIs // APIs
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) { int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
STsdbSnapWriter* pWriter = NULL; STsdbSnapWriter* pWriter = NULL;
// alloc // alloc
pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) { if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
pWriter->pTsdb = pTsdb; pWriter->pTsdb = pTsdb;
pWriter->sver = sver; pWriter->sver = sver;
pWriter->ever = ever; pWriter->ever = ever;
code = tsdbFSCopy(pTsdb, &pWriter->fs); code = tsdbFSCopy(pTsdb, &pWriter->fs);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// config // config
pWriter->minutes = pTsdb->keepCfg.days; pWriter->minutes = pTsdb->keepCfg.days;
...@@ -1272,7 +1280,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -1272,7 +1280,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
// SNAP_DATA_TSDB // SNAP_DATA_TSDB
code = tBlockDataCreate(&pWriter->bData); code = tBlockDataCreate(&pWriter->bData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
pWriter->fid = INT32_MIN; pWriter->fid = INT32_MIN;
pWriter->id = (TABLEID){0}; pWriter->id = (TABLEID){0};
...@@ -1280,53 +1288,67 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -1280,53 +1288,67 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
pWriter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); pWriter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pWriter->dReader.aBlockIdx == NULL) { if (pWriter->dReader.aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tBlockDataCreate(&pWriter->dReader.bData); code = tBlockDataCreate(&pWriter->dReader.bData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// Writer // Writer
pWriter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); pWriter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pWriter->dWriter.aBlockIdx == NULL) { if (pWriter->dWriter.aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
pWriter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); pWriter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if (pWriter->dWriter.aSttBlk == NULL) { if (pWriter->dWriter.aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tBlockDataCreate(&pWriter->dWriter.bData); code = tBlockDataCreate(&pWriter->dWriter.bData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataCreate(&pWriter->dWriter.sData); code = tBlockDataCreate(&pWriter->dWriter.sData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// SNAP_DATA_DEL // SNAP_DATA_DEL
pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx)); pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx));
if (pWriter->aDelIdxR == NULL) { if (pWriter->aDelIdxR == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
pWriter->aDelData = taosArrayInit(0, sizeof(SDelData)); pWriter->aDelData = taosArrayInit(0, sizeof(SDelData));
if (pWriter->aDelData == NULL) { if (pWriter->aDelData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
pWriter->aDelIdxW = taosArrayInit(0, sizeof(SDelIdx)); pWriter->aDelIdxW = taosArrayInit(0, sizeof(SDelIdx));
if (pWriter->aDelIdxW == NULL) { if (pWriter->aDelIdxW == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
*ppWriter = pWriter; _exit:
if (code) {
tsdbInfo("vgId:%d, tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
return code; *ppWriter = NULL;
_err: if (pWriter) {
tsdbError("vgId:%d, tsdb snapshot writer open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, if (pWriter->aDelIdxW) taosArrayDestroy(pWriter->aDelIdxW);
tstrerror(code)); if (pWriter->aDelData) taosArrayDestroy(pWriter->aDelData);
*ppWriter = NULL; if (pWriter->aDelIdxR) taosArrayDestroy(pWriter->aDelIdxR);
tBlockDataDestroy(&pWriter->dWriter.sData, 1);
tBlockDataDestroy(&pWriter->dWriter.bData, 1);
if (pWriter->dWriter.aSttBlk) taosArrayDestroy(pWriter->dWriter.aSttBlk);
if (pWriter->dWriter.aBlockIdx) taosArrayDestroy(pWriter->dWriter.aBlockIdx);
tBlockDataDestroy(&pWriter->dReader.bData, 1);
if (pWriter->dReader.aBlockIdx) taosArrayDestroy(pWriter->dReader.aBlockIdx);
tBlockDataDestroy(&pWriter->bData, 1);
tsdbFSDestroy(&pWriter->fs);
taosMemoryFree(pWriter);
}
} else {
tsdbDebug("vgId:%d, tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path);
*ppWriter = pWriter;
}
return code; return code;
} }
......
...@@ -34,7 +34,9 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp * ...@@ -34,7 +34,9 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
} }
// loop to insert // loop to insert
tInitSubmitMsgIter(pMsg, &msgIter); if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) {
return -1;
}
while (true) { while (true) {
SSubmitBlkRsp r = {0}; SSubmitBlkRsp r = {0};
tGetSubmitMsgNext(&msgIter, &pBlock); tGetSubmitMsgNext(&msgIter, &pBlock);
......
...@@ -141,17 +141,17 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { ...@@ -141,17 +141,17 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
} }
void vnodeBufPoolFree(SVBufPool *pPool, void *p) { void vnodeBufPoolFree(SVBufPool *pPool, void *p) {
uint8_t *ptr = (uint8_t *)p; // uint8_t *ptr = (uint8_t *)p;
SVBufPoolNode *pNode; // SVBufPoolNode *pNode;
if (ptr < pPool->node.data || ptr >= pPool->node.data + pPool->node.size) { // if (ptr < pPool->node.data || ptr >= pPool->node.data + pPool->node.size) {
pNode = &((SVBufPoolNode *)p)[-1]; // pNode = &((SVBufPoolNode *)p)[-1];
*pNode->pnext = pNode->prev; // *pNode->pnext = pNode->prev;
pNode->prev->pnext = pNode->pnext; // pNode->prev->pnext = pNode->pnext;
pPool->size = pPool->size - sizeof(*pNode) - pNode->size; // pPool->size = pPool->size - sizeof(*pNode) - pNode->size;
taosMemoryFree(pNode); // taosMemoryFree(pNode);
} // }
} }
void vnodeBufPoolRef(SVBufPool *pPool) { void vnodeBufPoolRef(SVBufPool *pPool) {
......
...@@ -46,11 +46,17 @@ int vnodeInit(int nthreads) { ...@@ -46,11 +46,17 @@ int vnodeInit(int nthreads) {
return 0; return 0;
} }
vnodeGlobal.stop = 0; taosThreadMutexInit(&vnodeGlobal.mutex, NULL);
taosThreadCondInit(&vnodeGlobal.hasTask, NULL);
taosThreadMutexLock(&vnodeGlobal.mutex);
vnodeGlobal.stop = 0;
vnodeGlobal.queue.next = &vnodeGlobal.queue; vnodeGlobal.queue.next = &vnodeGlobal.queue;
vnodeGlobal.queue.prev = &vnodeGlobal.queue; vnodeGlobal.queue.prev = &vnodeGlobal.queue;
taosThreadMutexUnlock(&(vnodeGlobal.mutex));
vnodeGlobal.nthreads = nthreads; vnodeGlobal.nthreads = nthreads;
vnodeGlobal.threads = taosMemoryCalloc(nthreads, sizeof(TdThread)); vnodeGlobal.threads = taosMemoryCalloc(nthreads, sizeof(TdThread));
if (vnodeGlobal.threads == NULL) { if (vnodeGlobal.threads == NULL) {
...@@ -59,9 +65,6 @@ int vnodeInit(int nthreads) { ...@@ -59,9 +65,6 @@ int vnodeInit(int nthreads) {
return -1; return -1;
} }
taosThreadMutexInit(&vnodeGlobal.mutex, NULL);
taosThreadCondInit(&vnodeGlobal.hasTask, NULL);
for (int i = 0; i < nthreads; i++) { for (int i = 0; i < nthreads; i++) {
taosThreadCreate(&(vnodeGlobal.threads[i]), NULL, loop, NULL); taosThreadCreate(&(vnodeGlobal.threads[i]), NULL, loop, NULL);
} }
......
...@@ -38,7 +38,7 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { ...@@ -38,7 +38,7 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
if (taosMkDir(path)) { if (taosMkDir(path)) {
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
strcpy(dir, path); snprintf(dir, TSDB_FILENAME_LEN, "%s", path);
} }
if (pCfg) { if (pCfg) {
...@@ -51,7 +51,7 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { ...@@ -51,7 +51,7 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
info.state.commitID = 0; info.state.commitID = 0;
if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) { if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) {
vError("vgId:%d, failed to save vnode config since %s", pCfg->vgId, tstrerror(terrno)); vError("vgId:%d, failed to save vnode config since %s", pCfg ? pCfg->vgId : 0, tstrerror(terrno));
return -1; return -1;
} }
......
...@@ -166,7 +166,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) ...@@ -166,7 +166,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
if (*ppData) { if (*ppData) {
goto _exit; goto _exit;
} else { } else {
pReader->tqHandleDone = 1; pReader->tqOffsetDone = 1;
code = tqOffsetReaderClose(&pReader->pTqOffsetReader); code = tqOffsetReaderClose(&pReader->pTqOffsetReader);
if (code) goto _err; if (code) goto _err;
} }
...@@ -219,7 +219,7 @@ _exit: ...@@ -219,7 +219,7 @@ _exit:
return code; return code;
_err: _err:
vError("vgId:% vnode snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code)); vError("vgId:%d vnode snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code));
return code; return code;
} }
...@@ -260,7 +260,10 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr ...@@ -260,7 +260,10 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
// commit it // commit it
code = vnodeCommit(pVnode); code = vnodeCommit(pVnode);
if (code) goto _err; if (code) {
taosMemoryFree(pWriter);
goto _err;
}
// inc commit ID // inc commit ID
pVnode->state.commitID++; pVnode->state.commitID++;
......
...@@ -68,7 +68,10 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -68,7 +68,10 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int64_t ctime = taosGetTimestampMs(); int64_t ctime = taosGetTimestampMs();
tb_uid_t uid; tb_uid_t uid;
tInitSubmitMsgIter(pSubmitReq, &msgIter); if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
code = terrno;
goto _err;
}
for (;;) { for (;;) {
tGetSubmitMsgNext(&msgIter, &pBlock); tGetSubmitMsgNext(&msgIter, &pBlock);
......
...@@ -34,9 +34,9 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage ...@@ -34,9 +34,9 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage); static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage);
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage); static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage);
static FORCE_INLINE int32_t pageCmpFn(const void *lhs, const void *rhs) { static FORCE_INLINE int32_t pageCmpFn(const SRBTreeNode *lhs, const SRBTreeNode *rhs) {
SPage *pPageL = (SPage *)(((uint8_t *)lhs) - sizeof(SRBTreeNode)); SPage *pPageL = (SPage *)(((uint8_t *)lhs) - offsetof(SPage, node));
SPage *pPageR = (SPage *)(((uint8_t *)rhs) - sizeof(SRBTreeNode)); SPage *pPageR = (SPage *)(((uint8_t *)rhs) - offsetof(SPage, node));
SPgno pgnoL = TDB_PAGE_PGNO(pPageL); SPgno pgnoL = TDB_PAGE_PGNO(pPageL);
SPgno pgnoR = TDB_PAGE_PGNO(pPageR); SPgno pgnoR = TDB_PAGE_PGNO(pPageR);
......
...@@ -219,7 +219,7 @@ SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z) { ...@@ -219,7 +219,7 @@ SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z) {
while (temp != pTree->NIL) { while (temp != pTree->NIL) {
y = temp; y = temp;
int32_t c = pTree->cmprFn(RBTREE_NODE_PAYLOAD(z), RBTREE_NODE_PAYLOAD(temp)); int32_t c = pTree->cmprFn(z, temp);
if (c < 0) { if (c < 0) {
temp = temp->left; temp = temp->left;
} else if (c > 0) { } else if (c > 0) {
...@@ -232,7 +232,7 @@ SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z) { ...@@ -232,7 +232,7 @@ SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z) {
if (y == pTree->NIL) { if (y == pTree->NIL) {
pTree->root = z; pTree->root = z;
} else if (pTree->cmprFn(RBTREE_NODE_PAYLOAD(z), RBTREE_NODE_PAYLOAD(y)) < 0) { } else if (pTree->cmprFn(z, y) < 0) {
y->left = z; y->left = z;
} else { } else {
y->right = z; y->right = z;
...@@ -245,10 +245,10 @@ SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z) { ...@@ -245,10 +245,10 @@ SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z) {
tRBTreePutFix(pTree, z); tRBTreePutFix(pTree, z);
// update min/max node // update min/max node
if (pTree->min == pTree->NIL || pTree->cmprFn(RBTREE_NODE_PAYLOAD(pTree->min), RBTREE_NODE_PAYLOAD(z)) > 0) { if (pTree->min == pTree->NIL || pTree->cmprFn(pTree->min, z) > 0) {
pTree->min = z; pTree->min = z;
} }
if (pTree->max == pTree->NIL || pTree->cmprFn(RBTREE_NODE_PAYLOAD(pTree->max), RBTREE_NODE_PAYLOAD(z)) < 0) { if (pTree->max == pTree->NIL || pTree->cmprFn(pTree->max, z) < 0) {
pTree->max = z; pTree->max = z;
} }
pTree->n++; pTree->n++;
...@@ -309,11 +309,11 @@ SRBTreeNode *tRBTreeDropByKey(SRBTree *pTree, void *pKey) { ...@@ -309,11 +309,11 @@ SRBTreeNode *tRBTreeDropByKey(SRBTree *pTree, void *pKey) {
return pNode; return pNode;
} }
SRBTreeNode *tRBTreeGet(SRBTree *pTree, void *pKey) { SRBTreeNode *tRBTreeGet(SRBTree *pTree, const SRBTreeNode *pKeyNode) {
SRBTreeNode *pNode = pTree->root; SRBTreeNode *pNode = pTree->root;
while (pNode != pTree->NIL) { while (pNode != pTree->NIL) {
int32_t c = pTree->cmprFn(pKey, RBTREE_NODE_PAYLOAD(pNode)); int32_t c = pTree->cmprFn(pKeyNode, pNode);
if (c < 0) { if (c < 0) {
pNode = pNode->left; pNode = pNode->left;
......
...@@ -145,7 +145,10 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it ...@@ -145,7 +145,10 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it
tSkipListWLock(pSkipList); tSkipListWLock(pSkipList);
void *pData = iterate(iter); void *pData = iterate(iter);
if (pData == NULL) return; if (pData == NULL) {
tSkipListUnlock(pSkipList);
return;
}
// backward to put the first data // backward to put the first data
hasDup = tSkipListGetPosToPut(pSkipList, backward, pData); hasDup = tSkipListGetPosToPut(pSkipList, backward, pData);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册