diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 4ccfea40510a6148ceec57893629c693d9d7a1cd..69785570f7bf975e4fb754383ed99ca18f1ec8b1 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -161,7 +161,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { // open sync if (vnodeSyncOpen(pVnode, dir)) { vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); - terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 7ecff7ae97bf04da20c1817c413a44a0dd32e8c8..96e22720e8b2b0fb4defc5d5b46e8d765fc54efb 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -32,7 +32,7 @@ uint64_t syncUtilAddr2U64(const char* host, uint16_t port); void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port); void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet); void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); -void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); +bool syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2); bool syncUtilEmptyId(const SRaftId* pId); diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 3bda9bcd51a1fe41fbeb09a1e4a39c3a53f1cd74..28b5313ac514ef98f4295cd547b947447d9c09bc 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -20,7 +20,10 @@ SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) { SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr)); - ASSERT(pSyncIndexMgr != NULL); + if (pSyncIndexMgr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr)); pSyncIndexMgr->replicas = &(pSyncNode->replicasId); @@ -248,4 +251,4 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftI } ASSERT(0); return -1; -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 17157fbd2327ce126b4e63e509c7f0935539860e..3d79c6c46477f17f59e35a259fcea21329bcda3d 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -51,15 +51,17 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths); int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); -// life cycle -static void syncFreeNode(void* param); // --------------------------------- +static void syncNodeFreeCb(void *param) { + syncNodeClose(param); + param = NULL; +} int32_t syncInit() { int32_t ret = 0; if (!syncEnvIsStart()) { - tsNodeRefId = taosOpenRef(200, syncFreeNode); + tsNodeRefId = taosOpenRef(200, syncNodeFreeCb); if (tsNodeRefId < 0) { sError("failed to init node ref"); syncCleanUp(); @@ -86,11 +88,15 @@ void syncCleanUp() { int64_t syncOpen(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); - ASSERT(pSyncNode != NULL); + if (pSyncNode == NULL) { + sError("failed to open sync node. vgId:%d", pSyncInfo->vgId); + return -1; + } pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode); if (pSyncNode->rid < 0) { - syncFreeNode(pSyncNode); + syncNodeClose(pSyncNode); + pSyncNode = NULL; return -1; } @@ -136,11 +142,9 @@ void syncStartStandBy(int64_t rid) { void syncStop(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) return; - int32_t vgId = pSyncNode->vgId; - syncNodeClose(pSyncNode); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + taosRemoveRef(tsNodeRefId, rid); sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%" PRId64, vgId, rid, tsNodeRefId); } @@ -210,7 +214,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; - sError("syncNodeCheckNewConfig error"); + sError("invalid new config. vgId:%d", pSyncNode->vgId); return -1; } @@ -237,7 +241,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; - sError("syncNodeCheckNewConfig error"); + sError("invalid new config. vgId:%d", pSyncNode->vgId); return -1; } @@ -941,16 +945,18 @@ _END: SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo; - SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); - ASSERT(pSyncNode != NULL); - memset(pSyncNode, 0, sizeof(SSyncNode)); + SSyncNode* pSyncNode = (SSyncNode*)taosMemoryCalloc(1, sizeof(SSyncNode)); + if (pSyncNode == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } int32_t ret = 0; if (!taosDirExist((char*)(pSyncInfo->path))) { if (taosMkDir(pSyncInfo->path) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr()); - return NULL; + goto _error; } } @@ -963,15 +969,21 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { meta.lastConfigIndex = SYNC_INDEX_INVALID; meta.batchSize = pSyncInfo->batchSize; ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath); - ASSERT(ret == 0); - + if (ret != 0) { + sError("failed to create raft cfg file. configPath: %s", pSyncNode->configPath); + goto _error; + } } else { // update syncCfg by raft_config.json pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); - ASSERT(pSyncNode->pRaftCfg != NULL); + if (pSyncNode->pRaftCfg == NULL) { + sError("failed to open raft cfg file. path:%s", pSyncNode->configPath); + goto _error; + } pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg; raftCfgClose(pSyncNode->pRaftCfg); + pSyncNode->pRaftCfg = NULL; } // init by SSyncInfo @@ -988,11 +1000,17 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // init raft config pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); - ASSERT(pSyncNode->pRaftCfg != NULL); + if (pSyncNode->pRaftCfg == NULL) { + sError("failed to open raft cfg file. path:%s", pSyncNode->configPath); + goto _error; + } // init internal pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; - syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId); + if (!syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) { + sError("failed to determine my raft member id. vgId:%d", pSyncNode->vgId); + goto _error; + } // init peersNum, peers, peersId pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1; @@ -1004,17 +1022,24 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { } } for (int i = 0; i < pSyncNode->peersNum; ++i) { - syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]); + if (!syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) { + sError("failed to determine raft member id. vgId:%d, peer:%d", pSyncNode->vgId, i); + goto _error; + } } // init replicaNum, replicasId pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum; for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { - syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); + if(!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) { + sError("failed to determine raft member id. vgId:%d, replica:%d", pSyncNode->vgId, i); + goto _error; + } } // init raft algorithm pSyncNode->pFsm = pSyncInfo->pFsm; + pSyncInfo->pFsm = NULL; pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum); pSyncNode->leaderCache = EMPTY_RAFT_ID; @@ -1047,29 +1072,50 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // init TLA+ server vars pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath); - ASSERT(pSyncNode->pRaftStore != NULL); + if (pSyncNode->pRaftStore == NULL) { + sError("failed to open raft store. path: %s", pSyncNode->raftStorePath); + goto _error; + } // init TLA+ candidate vars pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode); - ASSERT(pSyncNode->pVotesGranted != NULL); + if (pSyncNode->pVotesGranted == NULL) { + sError("failed to create VotesGranted. vgId:%d", pSyncNode->vgId); + goto _error; + } pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode); - ASSERT(pSyncNode->pVotesRespond != NULL); + if (pSyncNode->pVotesRespond == NULL) { + sError("failed to create VotesRespond. vgId:%d", pSyncNode->vgId); + goto _error; + } // init TLA+ leader vars pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode); - ASSERT(pSyncNode->pNextIndex != NULL); + if (pSyncNode->pNextIndex == NULL) { + sError("failed to create SyncIndexMgr. vgId:%d", pSyncNode->vgId); + goto _error; + } pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode); - ASSERT(pSyncNode->pMatchIndex != NULL); + if (pSyncNode->pMatchIndex == NULL) { + sError("failed to create SyncIndexMgr. vgId:%d", pSyncNode->vgId); + goto _error; + } // init TLA+ log vars pSyncNode->pLogStore = logStoreCreate(pSyncNode); - ASSERT(pSyncNode->pLogStore != NULL); + if (pSyncNode->pLogStore == NULL) { + sError("failed to create SyncLogStore. vgId:%d", pSyncNode->vgId); + goto _error; + } SyncIndex commitIndex = SYNC_INDEX_INVALID; if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { SSnapshot snapshot = {0}; int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); - ASSERT(code == 0); + if (code != 0) { + sError("failed to get snapshot info. vgId:%d, code:%d", pSyncNode->vgId, code); + goto _error; + } if (snapshot.lastApplyIndex > commitIndex) { commitIndex = snapshot.lastApplyIndex; syncNodeEventLog(pSyncNode, "reset commit index by snapshot"); @@ -1132,7 +1178,10 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // tools pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS); - ASSERT(pSyncNode->pSyncRespMgr != NULL); + if (pSyncNode->pSyncRespMgr == NULL) { + sError("failed to create SyncRespMgr. vgId:%d", pSyncNode->vgId); + goto _error; + } // restore state pSyncNode->restoreFinish = false; @@ -1162,6 +1211,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { syncNodeEventLog(pSyncNode, "sync open"); return pSyncNode; + +_error: + if (pSyncInfo->pFsm) { + taosMemoryFree(pSyncInfo->pFsm); + pSyncInfo->pFsm = NULL; + } + syncNodeClose(pSyncNode); + pSyncNode = NULL; + return NULL; } void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) { @@ -1214,20 +1272,28 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) { syncNodeEventLog(pSyncNode, "sync close"); - + if (pSyncNode == NULL) { + return; + } int32_t ret; - ASSERT(pSyncNode != NULL); ret = raftStoreClose(pSyncNode->pRaftStore); ASSERT(ret == 0); syncRespMgrDestroy(pSyncNode->pSyncRespMgr); + pSyncNode->pSyncRespMgr = NULL; voteGrantedDestroy(pSyncNode->pVotesGranted); + pSyncNode->pVotesGranted = NULL; votesRespondDestory(pSyncNode->pVotesRespond); + pSyncNode->pVotesRespond = NULL; syncIndexMgrDestroy(pSyncNode->pNextIndex); + pSyncNode->pNextIndex = NULL; syncIndexMgrDestroy(pSyncNode->pMatchIndex); + pSyncNode->pMatchIndex = NULL; logStoreDestory(pSyncNode->pLogStore); + pSyncNode->pLogStore = NULL; raftCfgClose(pSyncNode->pRaftCfg); + pSyncNode->pRaftCfg = NULL; syncNodeStopPingTimer(pSyncNode); syncNodeStopElectTimer(pSyncNode); @@ -1249,8 +1315,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { pSyncNode->pNewNodeReceiver = NULL; } - // free memory in syncFreeNode - // taosMemoryFree(pSyncNode); + taosMemoryFree(pSyncNode); } // option @@ -2534,7 +2599,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { return; } } else { - sError("syncNodeEqHeartbeatTimer FpEqMsg is NULL"); + sError("vgId:%d, enqueue msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId); } syncTimeoutDestroy(pSyncMsg); @@ -2774,14 +2839,6 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p return 0; } -static void syncFreeNode(void* param) { - SSyncNode* pNode = param; - // inner object already free - // syncNodePrint2((char*)"==syncFreeNode==", pNode); - - taosMemoryFree(pNode); -} - const char* syncStr(ESyncState state) { switch (state) { case TAOS_SYNC_STATE_FOLLOWER: diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index ab404d1b9af744b51b508cd1f870482c79756ea1..57126d0871da0ab80cf718260377fa8754581d65 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -364,8 +364,6 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) { int32_t sysErr = errno; const char *sysErrStr = strerror(errno); sError("create raft cfg file error, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", err, err, errStr, sysErr, sysErrStr); - ASSERT(0); - return -1; } diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 29f78b582f1f1ddab18dba54b3b495bf3aa8af98..22b47a2c45e14216b1554c9e606f2d66b1d07b5e 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -28,7 +28,7 @@ SRaftStore *raftStoreOpen(const char *path) { SRaftStore *pRaftStore = taosMemoryMalloc(sizeof(SRaftStore)); if (pRaftStore == NULL) { - sError("raftStoreOpen malloc error"); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } memset(pRaftStore, 0, sizeof(*pRaftStore)); @@ -72,7 +72,9 @@ static int32_t raftStoreInit(SRaftStore *pRaftStore) { } int32_t raftStoreClose(SRaftStore *pRaftStore) { - ASSERT(pRaftStore != NULL); + if (pRaftStore == NULL) { + return 0; + } taosCloseFile(&pRaftStore->pFile); taosMemoryFree(pRaftStore); diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index d7ed864180335eb5a07ea58298c574fb71265fe2..103c2254768a3b5bfc6d5c8090828f999b2e8c9e 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -19,6 +19,10 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) { SSyncRespMgr *pObj = (SSyncRespMgr *)taosMemoryMalloc(sizeof(SSyncRespMgr)); + if (pObj == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } memset(pObj, 0, sizeof(SSyncRespMgr)); pObj->pRespHash = diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 0be3392a9a52b69e29cbcedcb910cdbc0f9a6234..68d81813ac760a37b78bfdc12aee59624b296c48 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -35,7 +35,10 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI SSyncSnapshotSender *pSender = NULL; if (condition) { pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender)); - ASSERT(pSender != NULL); + if (pSender == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } memset(pSender, 0, sizeof(*pSender)); pSender->start = false; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 325073f3660172a1067d0bf92e11a8ae9f88fa01..6f234631dae8620df80fe5aba0366a5cb8ad62c7 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -26,7 +26,8 @@ uint64_t syncUtilAddr2U64(const char* host, uint16_t port) { uint32_t hostU32 = taosGetIpv4FromFqdn(host); if (hostU32 == (uint32_t)-1) { - sError("Get IP address error"); + sError("failed to resolve ipv4 addr. host:%s", host); + terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } @@ -84,13 +85,18 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) { addEpIntoEpSet(pEpSet, host, port); } -void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) { +bool syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) { uint32_t ipv4 = taosGetIpv4FromFqdn(pNodeInfo->nodeFqdn); - ASSERT(ipv4 != 0xFFFFFFFF); + if (ipv4 == 0xFFFFFFFF || ipv4 == 1) { + sError("failed to resolve ipv4 addr. fqdn: %s", pNodeInfo->nodeFqdn); + terrno = TSDB_CODE_TSC_INVALID_FQDN; + return false; + } char ipbuf[128] = {0}; tinet_ntoa(ipbuf, ipv4); raftId->addr = syncUtilAddr2U64(ipbuf, pNodeInfo->nodePort); raftId->vgId = vgId; + return true; } bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) { @@ -310,4 +316,4 @@ void syncUtilJson2Line(char* jsonStr) { q++; } } -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 1d46d71a05b66d30fba02ec342014a56f388e73e..641bb32d2d56a0bf745c6cb4c5fb43ccaaaa6f75 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -24,7 +24,10 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) { SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) { SVotesGranted *pVotesGranted = taosMemoryMalloc(sizeof(SVotesGranted)); - ASSERT(pVotesGranted != NULL); + if (pVotesGranted == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } memset(pVotesGranted, 0, sizeof(SVotesGranted)); pVotesGranted->replicas = &(pSyncNode->replicasId);