提交 d246514d 编写于 作者: M Minghao Li

refactor(sync): check msgcb, putToQueueFp NULL

上级 6b744129
...@@ -17,11 +17,44 @@ ...@@ -17,11 +17,44 @@
#include "mndSync.h" #include "mndSync.h"
#include "mndTrans.h" #include "mndTrans.h"
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) {
return -1;
}
SMsgHead *pHead = pMsg->pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return -1;
}
int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
return code;
}
static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) {
return -1;
}
SMsgHead *pHead = pMsg->pCont; SMsgHead *pHead = pMsg->pCont;
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return -1;
}
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
if (code != 0) { if (code != 0) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
...@@ -212,7 +245,7 @@ int32_t mndInitSync(SMnode *pMnode) { ...@@ -212,7 +245,7 @@ int32_t mndInitSync(SMnode *pMnode) {
.msgcb = NULL, .msgcb = NULL,
.FpSendMsg = mndSyncSendMsg, .FpSendMsg = mndSyncSendMsg,
.FpEqMsg = mndSyncEqMsg, .FpEqMsg = mndSyncEqMsg,
.FpEqCtrlMsg = NULL, .FpEqCtrlMsg = mndSyncEqCtrlMsg,
}; };
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
......
...@@ -365,7 +365,13 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -365,7 +365,13 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (msgcb == NULL) { if (pMsg == NULL || pMsg->pCont == NULL) {
return -1;
}
if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return -1; return -1;
} }
...@@ -378,7 +384,13 @@ static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { ...@@ -378,7 +384,13 @@ static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
} }
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (msgcb == NULL) { if (pMsg == NULL || pMsg->pCont == NULL) {
return -1;
}
if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return -1; return -1;
} }
......
...@@ -508,7 +508,7 @@ int32_t syncEndSnapshot(int64_t rid) { ...@@ -508,7 +508,7 @@ int32_t syncEndSnapshot(int64_t rid) {
SSyncLogStoreData* pData = pSyncNode->pLogStore->data; SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
code = walEndSnapshot(pData->pWal); code = walEndSnapshot(pData->pWal);
if (code != 0) { if (code != 0) {
sError("vgId:%d, wal snapshot end error since:%s", pSyncNode->vgId, terrstr(terrno)); sError("vgId:%d, wal snapshot end error since:%s", pSyncNode->vgId, terrstr());
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return -1; return -1;
...@@ -2793,7 +2793,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { ...@@ -2793,7 +2793,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
pSyncNode->vgId, pSyncNode); pSyncNode->vgId, pSyncNode);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
if (pSyncNode->FpEqMsg != NULL) { if (pSyncNode->FpEqMsg != NULL && pSyncNode->msgcb != NULL && pSyncNode->msgcb->putToQueueFp != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
...@@ -3379,8 +3379,8 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde ...@@ -3379,8 +3379,8 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
// ASSERT(code == 0); // ASSERT(code == 0);
// ASSERT(pEntry != NULL); // ASSERT(pEntry != NULL);
if (code != 0 || pEntry == NULL) { if (code != 0 || pEntry == NULL) {
syncNodeErrorLog(ths, "get log entry error"); syncNodeErrorLog(ths, "get log entry error");
sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr()); sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr());
continue; continue;
} }
} }
......
...@@ -76,7 +76,7 @@ int32_t syncNodeTimerRoutine(SSyncNode* ths) { ...@@ -76,7 +76,7 @@ int32_t syncNodeTimerRoutine(SSyncNode* ths) {
SSyncLogStoreData* pData = ths->pLogStore->data; SSyncLogStoreData* pData = ths->pLogStore->data;
int32_t code = walEndSnapshot(pData->pWal); int32_t code = walEndSnapshot(pData->pWal);
if (code != 0) { if (code != 0) {
sError("vgId:%d, wal snapshot end error since:%s", ths->vgId, terrstr(terrno)); sError("vgId:%d, timer wal snapshot end error since:%s", ths->vgId, terrstr());
return -1; return -1;
} else { } else {
do { do {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册