未验证 提交 44b8d659 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17130 from taosdata/FIX/TD-19260-3.0

enh: improve error handling in syncNodeOpen instead of using assert, with allocated resources cleaned up properly
......@@ -85,6 +85,8 @@ _err:
int tsdbClose(STsdb **pTsdb) {
if (*pTsdb) {
taosThreadRwlockDestroy(&(*pTsdb)->rwLock);
tsdbMemTableDestroy((*pTsdb)->mem);
(*pTsdb)->mem = NULL;
tsdbFSClose(*pTsdb);
tsdbCloseCache(*pTsdb);
taosMemoryFreeClear(*pTsdb);
......
......@@ -53,6 +53,10 @@ int vnodeCloseBufPool(SVnode *pVnode) {
vnodeBufPoolDestroy(pPool);
}
if (pVnode->inUse) {
vnodeBufPoolDestroy(pVnode->inUse);
pVnode->inUse = NULL;
}
vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode));
return 0;
......@@ -177,4 +181,4 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) {
taosThreadMutexUnlock(&pVnode->mutex);
}
}
\ No newline at end of file
}
......@@ -161,7 +161,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
// open sync
if (vnodeSyncOpen(pVnode, dir)) {
vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
......@@ -174,6 +173,7 @@ _err:
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
if (pVnode->pSma) smaClose(pVnode->pSma);
if (pVnode->pMeta) metaClose(pVnode->pMeta);
if (pVnode->pPool) vnodeCloseBufPool(pVnode);
tsem_destroy(&(pVnode->canCommit));
taosMemoryFree(pVnode);
......
......@@ -32,7 +32,7 @@ uint64_t syncUtilAddr2U64(const char* host, uint16_t port);
void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port);
void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet);
void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet);
void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId);
bool syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId);
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2);
bool syncUtilEmptyId(const SRaftId* pId);
......
......@@ -20,7 +20,10 @@
SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) {
SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr));
ASSERT(pSyncIndexMgr != NULL);
if (pSyncIndexMgr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr));
pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
......@@ -248,4 +251,4 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftI
}
ASSERT(0);
return -1;
}
\ No newline at end of file
}
......@@ -51,15 +51,17 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths);
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
// life cycle
static void syncFreeNode(void* param);
// ---------------------------------
static void syncNodeFreeCb(void *param) {
syncNodeClose(param);
param = NULL;
}
int32_t syncInit() {
int32_t ret = 0;
if (!syncEnvIsStart()) {
tsNodeRefId = taosOpenRef(200, syncFreeNode);
tsNodeRefId = taosOpenRef(200, syncNodeFreeCb);
if (tsNodeRefId < 0) {
sError("failed to init node ref");
syncCleanUp();
......@@ -86,11 +88,15 @@ void syncCleanUp() {
int64_t syncOpen(const SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
ASSERT(pSyncNode != NULL);
if (pSyncNode == NULL) {
sError("failed to open sync node. vgId:%d", pSyncInfo->vgId);
return -1;
}
pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode);
if (pSyncNode->rid < 0) {
syncFreeNode(pSyncNode);
syncNodeClose(pSyncNode);
pSyncNode = NULL;
return -1;
}
......@@ -136,11 +142,9 @@ void syncStartStandBy(int64_t rid) {
void syncStop(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) return;
int32_t vgId = pSyncNode->vgId;
syncNodeClose(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
taosRemoveRef(tsNodeRefId, rid);
sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%" PRId64, vgId, rid, tsNodeRefId);
}
......@@ -210,7 +214,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
sError("syncNodeCheckNewConfig error");
sError("invalid new config. vgId:%d", pSyncNode->vgId);
return -1;
}
......@@ -237,7 +241,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
sError("syncNodeCheckNewConfig error");
sError("invalid new config. vgId:%d", pSyncNode->vgId);
return -1;
}
......@@ -941,16 +945,18 @@ _END:
SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo;
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
ASSERT(pSyncNode != NULL);
memset(pSyncNode, 0, sizeof(SSyncNode));
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryCalloc(1, sizeof(SSyncNode));
if (pSyncNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
int32_t ret = 0;
if (!taosDirExist((char*)(pSyncInfo->path))) {
if (taosMkDir(pSyncInfo->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
return NULL;
goto _error;
}
}
......@@ -963,15 +969,21 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
meta.lastConfigIndex = SYNC_INDEX_INVALID;
meta.batchSize = pSyncInfo->batchSize;
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath);
ASSERT(ret == 0);
if (ret != 0) {
sError("failed to create raft cfg file. configPath: %s", pSyncNode->configPath);
goto _error;
}
} else {
// update syncCfg by raft_config.json
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
ASSERT(pSyncNode->pRaftCfg != NULL);
if (pSyncNode->pRaftCfg == NULL) {
sError("failed to open raft cfg file. path:%s", pSyncNode->configPath);
goto _error;
}
pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
raftCfgClose(pSyncNode->pRaftCfg);
pSyncNode->pRaftCfg = NULL;
}
// init by SSyncInfo
......@@ -988,11 +1000,17 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// init raft config
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
ASSERT(pSyncNode->pRaftCfg != NULL);
if (pSyncNode->pRaftCfg == NULL) {
sError("failed to open raft cfg file. path:%s", pSyncNode->configPath);
goto _error;
}
// init internal
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
if (!syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
sError("failed to determine my raft member id. vgId:%d", pSyncNode->vgId);
goto _error;
}
// init peersNum, peers, peersId
pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
......@@ -1004,17 +1022,24 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
}
}
for (int i = 0; i < pSyncNode->peersNum; ++i) {
syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
if (!syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
sError("failed to determine raft member id. vgId:%d, peer:%d", pSyncNode->vgId, i);
goto _error;
}
}
// init replicaNum, replicasId
pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
if(!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
sError("failed to determine raft member id. vgId:%d, replica:%d", pSyncNode->vgId, i);
goto _error;
}
}
// init raft algorithm
pSyncNode->pFsm = pSyncInfo->pFsm;
pSyncInfo->pFsm = NULL;
pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
pSyncNode->leaderCache = EMPTY_RAFT_ID;
......@@ -1047,29 +1072,50 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// init TLA+ server vars
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
ASSERT(pSyncNode->pRaftStore != NULL);
if (pSyncNode->pRaftStore == NULL) {
sError("failed to open raft store. path: %s", pSyncNode->raftStorePath);
goto _error;
}
// init TLA+ candidate vars
pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
ASSERT(pSyncNode->pVotesGranted != NULL);
if (pSyncNode->pVotesGranted == NULL) {
sError("failed to create VotesGranted. vgId:%d", pSyncNode->vgId);
goto _error;
}
pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
ASSERT(pSyncNode->pVotesRespond != NULL);
if (pSyncNode->pVotesRespond == NULL) {
sError("failed to create VotesRespond. vgId:%d", pSyncNode->vgId);
goto _error;
}
// init TLA+ leader vars
pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
ASSERT(pSyncNode->pNextIndex != NULL);
if (pSyncNode->pNextIndex == NULL) {
sError("failed to create SyncIndexMgr. vgId:%d", pSyncNode->vgId);
goto _error;
}
pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
ASSERT(pSyncNode->pMatchIndex != NULL);
if (pSyncNode->pMatchIndex == NULL) {
sError("failed to create SyncIndexMgr. vgId:%d", pSyncNode->vgId);
goto _error;
}
// init TLA+ log vars
pSyncNode->pLogStore = logStoreCreate(pSyncNode);
ASSERT(pSyncNode->pLogStore != NULL);
if (pSyncNode->pLogStore == NULL) {
sError("failed to create SyncLogStore. vgId:%d", pSyncNode->vgId);
goto _error;
}
SyncIndex commitIndex = SYNC_INDEX_INVALID;
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot = {0};
int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
ASSERT(code == 0);
if (code != 0) {
sError("failed to get snapshot info. vgId:%d, code:%d", pSyncNode->vgId, code);
goto _error;
}
if (snapshot.lastApplyIndex > commitIndex) {
commitIndex = snapshot.lastApplyIndex;
syncNodeEventLog(pSyncNode, "reset commit index by snapshot");
......@@ -1132,7 +1178,10 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// tools
pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
ASSERT(pSyncNode->pSyncRespMgr != NULL);
if (pSyncNode->pSyncRespMgr == NULL) {
sError("failed to create SyncRespMgr. vgId:%d", pSyncNode->vgId);
goto _error;
}
// restore state
pSyncNode->restoreFinish = false;
......@@ -1162,6 +1211,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
syncNodeEventLog(pSyncNode, "sync open");
return pSyncNode;
_error:
if (pSyncInfo->pFsm) {
taosMemoryFree(pSyncInfo->pFsm);
pSyncInfo->pFsm = NULL;
}
syncNodeClose(pSyncNode);
pSyncNode = NULL;
return NULL;
}
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
......@@ -1214,20 +1272,28 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
void syncNodeClose(SSyncNode* pSyncNode) {
syncNodeEventLog(pSyncNode, "sync close");
if (pSyncNode == NULL) {
return;
}
int32_t ret;
ASSERT(pSyncNode != NULL);
ret = raftStoreClose(pSyncNode->pRaftStore);
ASSERT(ret == 0);
syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
pSyncNode->pSyncRespMgr = NULL;
voteGrantedDestroy(pSyncNode->pVotesGranted);
pSyncNode->pVotesGranted = NULL;
votesRespondDestory(pSyncNode->pVotesRespond);
pSyncNode->pVotesRespond = NULL;
syncIndexMgrDestroy(pSyncNode->pNextIndex);
pSyncNode->pNextIndex = NULL;
syncIndexMgrDestroy(pSyncNode->pMatchIndex);
pSyncNode->pMatchIndex = NULL;
logStoreDestory(pSyncNode->pLogStore);
pSyncNode->pLogStore = NULL;
raftCfgClose(pSyncNode->pRaftCfg);
pSyncNode->pRaftCfg = NULL;
syncNodeStopPingTimer(pSyncNode);
syncNodeStopElectTimer(pSyncNode);
......@@ -1249,8 +1315,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
pSyncNode->pNewNodeReceiver = NULL;
}
// free memory in syncFreeNode
// taosMemoryFree(pSyncNode);
taosMemoryFree(pSyncNode);
}
// option
......@@ -2534,7 +2599,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
return;
}
} else {
sError("syncNodeEqHeartbeatTimer FpEqMsg is NULL");
sError("vgId:%d, enqueue msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId);
}
syncTimeoutDestroy(pSyncMsg);
......@@ -2774,14 +2839,6 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p
return 0;
}
static void syncFreeNode(void* param) {
SSyncNode* pNode = param;
// inner object already free
// syncNodePrint2((char*)"==syncFreeNode==", pNode);
taosMemoryFree(pNode);
}
const char* syncStr(ESyncState state) {
switch (state) {
case TAOS_SYNC_STATE_FOLLOWER:
......
......@@ -364,8 +364,6 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
int32_t sysErr = errno;
const char *sysErrStr = strerror(errno);
sError("create raft cfg file error, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", err, err, errStr, sysErr, sysErrStr);
ASSERT(0);
return -1;
}
......
......@@ -28,7 +28,7 @@ SRaftStore *raftStoreOpen(const char *path) {
SRaftStore *pRaftStore = taosMemoryMalloc(sizeof(SRaftStore));
if (pRaftStore == NULL) {
sError("raftStoreOpen malloc error");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memset(pRaftStore, 0, sizeof(*pRaftStore));
......@@ -72,7 +72,9 @@ static int32_t raftStoreInit(SRaftStore *pRaftStore) {
}
int32_t raftStoreClose(SRaftStore *pRaftStore) {
ASSERT(pRaftStore != NULL);
if (pRaftStore == NULL) {
return 0;
}
taosCloseFile(&pRaftStore->pFile);
taosMemoryFree(pRaftStore);
......
......@@ -19,6 +19,10 @@
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
SSyncRespMgr *pObj = (SSyncRespMgr *)taosMemoryMalloc(sizeof(SSyncRespMgr));
if (pObj == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memset(pObj, 0, sizeof(SSyncRespMgr));
pObj->pRespHash =
......
......@@ -35,7 +35,10 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
SSyncSnapshotSender *pSender = NULL;
if (condition) {
pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
ASSERT(pSender != NULL);
if (pSender == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memset(pSender, 0, sizeof(*pSender));
pSender->start = false;
......
......@@ -26,7 +26,8 @@ uint64_t syncUtilAddr2U64(const char* host, uint16_t port) {
uint32_t hostU32 = taosGetIpv4FromFqdn(host);
if (hostU32 == (uint32_t)-1) {
sError("Get IP address error");
sError("failed to resolve ipv4 addr. host:%s", host);
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
......@@ -84,13 +85,18 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {
addEpIntoEpSet(pEpSet, host, port);
}
void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) {
bool syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) {
uint32_t ipv4 = taosGetIpv4FromFqdn(pNodeInfo->nodeFqdn);
ASSERT(ipv4 != 0xFFFFFFFF);
if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
sError("failed to resolve ipv4 addr. fqdn: %s", pNodeInfo->nodeFqdn);
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return false;
}
char ipbuf[128] = {0};
tinet_ntoa(ipbuf, ipv4);
raftId->addr = syncUtilAddr2U64(ipbuf, pNodeInfo->nodePort);
raftId->vgId = vgId;
return true;
}
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
......@@ -310,4 +316,4 @@ void syncUtilJson2Line(char* jsonStr) {
q++;
}
}
}
\ No newline at end of file
}
......@@ -24,7 +24,10 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) {
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) {
SVotesGranted *pVotesGranted = taosMemoryMalloc(sizeof(SVotesGranted));
ASSERT(pVotesGranted != NULL);
if (pVotesGranted == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memset(pVotesGranted, 0, sizeof(SVotesGranted));
pVotesGranted->replicas = &(pSyncNode->replicasId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册