diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index e33e92cae8e5ec46cab96f1f08a2737ab760ac62..73172d233cf554aeb30fd2ce6a3eb6fe6888cd57 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -214,7 +214,23 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf // todo SRpcMsg *pRsp = NULL; - (void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); + int32_t ret = vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); + if (ret != 0) { + // if leader, send response + if (pMsg->rpcMsg.handle != NULL) { + SRpcMsg rsp; + rsp.pCont = NULL; + rsp.contLen = 0; + + rsp.code = terrno; + dTrace("vmProcessSyncQueue error, code:%d", terrno); + + rsp.ahandle = pMsg->rpcMsg.ahandle; + rsp.handle = pMsg->rpcMsg.handle; + rsp.refId = pMsg->rpcMsg.refId; + tmsgSendRsp(&rsp); + } + } rpcFreeCont(pMsg->rpcMsg.pCont); taosFreeQitem(pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index fc2b6fe67682cc5b1c182164e3d78e49bf24a34d..aab027e15c72ded44dacea387639f73b943ca342 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -199,6 +199,8 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { } int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; + if (syncEnvIsStart()) { SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); assert(pSyncNode != NULL); @@ -220,67 +222,70 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); - syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); + ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); syncTimeoutDestroy(pSyncMsg); } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); - syncNodeOnPingCb(pSyncNode, pSyncMsg); + ret = syncNodeOnPingCb(pSyncNode, pSyncMsg); syncPingDestroy(pSyncMsg); } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); - syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); + ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); syncPingReplyDestroy(pSyncMsg); } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) { SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); - syncNodeOnClientRequestCb(pSyncNode, pSyncMsg); + ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg); syncClientRequestDestroy(pSyncMsg); } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); - syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); + ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); syncRequestVoteDestroy(pSyncMsg); } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) { SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); - syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); + ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg); } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) { SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); - syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); + ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); syncAppendEntriesDestroy(pSyncMsg); } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) { SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); - syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); + ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); } else { vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); + ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; } syncNodeRelease(pSyncNode); } else { vError("==vnodeProcessSyncReq== error syncEnv stop"); + ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; } - return 0; + + return ret; } static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {