未验证 提交 009049c8 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #12899 from taosdata/fix/mnode

fix: avoid memory leak in mnode sync
......@@ -58,8 +58,14 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from mnode-sync queue", pMsg);
pMsg->info.node = pMgmt->pMnode;
mndProcessSyncMsg(pMsg);
int32_t code = mndProcessSyncMsg(pMsg);
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
......
......@@ -346,95 +346,65 @@ void mndStop(SMnode *pMnode) {
}
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
void *ahandle = pMsg->info.ahandle;
int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
if (syncEnvIsStart()) {
SSyncNode *pSyncNode = syncNodeAcquire(pMnode->syncMgmt.sync);
assert(pSyncNode != NULL);
ESyncState state = syncGetMyRole(pMnode->syncMgmt.sync);
SyncTerm currentTerm = syncGetMyTerm(pMnode->syncMgmt.sync);
SMsgHead *pHead = pMsg->pCont;
char logBuf[512];
char *syncNodeStr = sync2SimpleStr(pMnode->syncMgmt.sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr);
SRpcMsg *pRpcMsg = pMsg;
if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
SMnode *pMnode = pMsg->info.node;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
int32_t code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
if (!syncEnvIsStart()) {
mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
} else {
mError("==mndProcessSyncMsg== error msg type:%d", pRpcMsg->msgType);
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
if (pSyncNode == NULL) {
mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
syncNodeRelease(pSyncNode);
char logBuf[512];
char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr);
if (pMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else {
mError("==mndProcessSyncMsg== error syncEnv stop");
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
return ret;
return code;
}
int32_t mndProcessMsg(SRpcMsg *pMsg) {
......
......@@ -210,6 +210,7 @@ void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg) {
SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncTimeout* pMsg = syncTimeoutDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
......@@ -436,6 +437,7 @@ void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) {
SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncPing* pMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
......@@ -695,6 +697,7 @@ void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) {
SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncPingReply* pMsg = syncPingReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
......@@ -861,6 +864,7 @@ void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg
// step 3. RpcMsg => SyncClientRequest, from queue
SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncClientRequest* pMsg = syncClientRequestDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
......@@ -986,6 +990,7 @@ void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg) {
SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncRequestVote* pMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
......@@ -1134,6 +1139,7 @@ void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply
SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncRequestVoteReply* pMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
......@@ -1281,6 +1287,7 @@ void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg
SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncAppendEntries* pMsg = syncAppendEntriesDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
......@@ -1444,6 +1451,7 @@ void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesR
SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册