diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index df4b526775539996f1895cf34f334367a01e9f9e..6853ff5ccc2da80421af146fb5fbcc21ddeea0c0 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,11 +17,44 @@ #include "mndSync.h" #include "mndTrans.h" +static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + if (pMsg == NULL || pMsg->pCont == NULL) { + return -1; + } + + SMsgHead *pHead = pMsg->pCont; + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + + if (msgcb == NULL || msgcb->putToQueueFp == NULL) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + return -1; + } + + int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg); + if (code != 0) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + } + return code; +} + static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + if (pMsg == NULL || pMsg->pCont == NULL) { + return -1; + } + SMsgHead *pHead = pMsg->pCont; pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); + if (msgcb == NULL || msgcb->putToQueueFp == NULL) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + return -1; + } + int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); if (code != 0) { rpcFreeCont(pMsg->pCont); @@ -212,7 +245,7 @@ int32_t mndInitSync(SMnode *pMnode) { .msgcb = NULL, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg, - .FpEqCtrlMsg = NULL, + .FpEqCtrlMsg = mndSyncEqCtrlMsg, }; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 7acf5b4003bf4b24045c2eeb5b00b495e76be74e..4af89dfa38d40f8a05707e5364a8dbebeeae28a7 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -365,7 +365,13 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { - if (msgcb == NULL) { + if (pMsg == NULL || pMsg->pCont == NULL) { + return -1; + } + + if (msgcb == NULL || msgcb->putToQueueFp == NULL) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; return -1; } @@ -378,7 +384,13 @@ static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { } static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { - if (msgcb == NULL) { + if (pMsg == NULL || pMsg->pCont == NULL) { + return -1; + } + + if (msgcb == NULL || msgcb->putToQueueFp == NULL) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; return -1; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 82ae8d7e51d87e853cf7939c228bc0808e6180a9..b4d939e98008b8a743912a27d0e0694bd2865c3a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -508,7 +508,7 @@ int32_t syncEndSnapshot(int64_t rid) { SSyncLogStoreData* pData = pSyncNode->pLogStore->data; code = walEndSnapshot(pData->pWal); if (code != 0) { - sError("vgId:%d, wal snapshot end error since:%s", pSyncNode->vgId, terrstr(terrno)); + sError("vgId:%d, wal snapshot end error since:%s", pSyncNode->vgId, terrstr()); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return -1; @@ -2793,7 +2793,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { pSyncNode->vgId, pSyncNode); SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); - if (pSyncNode->FpEqMsg != NULL) { + if (pSyncNode->FpEqMsg != NULL && pSyncNode->msgcb != NULL && pSyncNode->msgcb->putToQueueFp != NULL) { int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); if (code != 0) { sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); @@ -3379,8 +3379,8 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde // ASSERT(code == 0); // ASSERT(pEntry != NULL); if (code != 0 || pEntry == NULL) { - syncNodeErrorLog(ths, "get log entry error"); - sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr()); + syncNodeErrorLog(ths, "get log entry error"); + sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr()); continue; } } diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 5ff73a6406d87234944711fa50d15e38fa24e5c7..1c082170995041c8ee86642331cf4d76496022d3 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -76,7 +76,7 @@ int32_t syncNodeTimerRoutine(SSyncNode* ths) { SSyncLogStoreData* pData = ths->pLogStore->data; int32_t code = walEndSnapshot(pData->pWal); if (code != 0) { - sError("vgId:%d, wal snapshot end error since:%s", ths->vgId, terrstr(terrno)); + sError("vgId:%d, timer wal snapshot end error since:%s", ths->vgId, terrstr()); return -1; } else { do {