diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index c13c50e1616146e43d3a74dc4145a5e7c7ffccf3..b5b997dac0a18c6ca83e07d5f5c597004df8a913 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -34,6 +34,7 @@ typedef enum { WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, + SYNC_CTRL_QUEUE, STREAM_QUEUE, QUEUE_MAX, } EQueueType; diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index de2271554d2b9166ec240ea07e91dea9d017ef92..b2c743831a225a6c1ae1f9363ca0f93b014ddfb6 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -695,6 +695,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie int32_t syncNodeOnSnapshotSendCb(SSyncNode* ths, SyncSnapshotSend* pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg); +int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg); +int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg); + // ----------------------------------------- typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg); typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 30f54831987999daccac93f5b6cbd0ccc2dfbf28..32649464c1561011695510ed1094bb0b5cf96091 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -34,6 +34,7 @@ typedef struct SVnodeMgmt { SQWorkerPool streamPool; SWWorkerPool fetchPool; SWWorkerPool syncPool; + SWWorkerPool syncCtrlPool; SWWorkerPool writePool; SWWorkerPool applyPool; SSingleWorker mgmtWorker; @@ -60,6 +61,7 @@ typedef struct { SVnode *pImpl; STaosQueue *pWriteQ; STaosQueue *pSyncQ; + STaosQueue *pSyncCtrlQ; STaosQueue *pApplyQ; STaosQueue *pQueryQ; STaosQueue *pStreamQ; @@ -106,6 +108,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc); int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0e53b7ca80fad5db6fedec880e495965b90e25a6..fe9f114084d17f7edd82e5bda558703c9e5a8ceb 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -422,6 +422,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index d7df30bc7587cada18a308176e2fc739c1679afa..e08f5963cdb877f21252e08b180d4c5c47fa39ab 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -136,6 +136,22 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf } } +static void vmProcessSyncCtrlQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnodeObj *pVnode = pInfo->ahandle; + SRpcMsg *pMsg = NULL; + + for (int32_t i = 0; i < numOfMsgs; ++i) { + if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; + const STraceId *trace = &pMsg->info.traceId; + dGTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg); + + int32_t code = vnodeProcessSyncCtrlMsg(pVnode->pImpl, pMsg, NULL); // no response here + dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } +} + static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) { const STraceId *trace = &pMsg->info.traceId; SMsgHead *pHead = pMsg->pCont; @@ -203,6 +219,10 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg); taosWriteQitem(pVnode->pSyncQ, pMsg); break; + case SYNC_CTRL_QUEUE: + dGTrace("vgId:%d, msg:%p put into vnode-sync-ctrl queue", pVnode->vgId, pMsg); + taosWriteQitem(pVnode->pSyncCtrlQ, pMsg); + break; case APPLY_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg); taosWriteQitem(pVnode->pApplyQ, pMsg); @@ -219,6 +239,10 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); } +int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return vmPutMsgToQueue(pMgmt, pMsg, SYNC_CTRL_QUEUE); +} + int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); } int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); } @@ -301,6 +325,7 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeWriteMsg); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); + pVnode->pSyncCtrlQ = tWWorkerAllocQueue(&pMgmt->syncCtrlPool, pVnode, (FItems)vmProcessSyncCtrlQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pStreamQ = tQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue); @@ -325,6 +350,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ); tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); + tWWorkerFreeQueue(&pMgmt->syncCtrlPool, pVnode->pSyncCtrlQ); tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ); tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); @@ -370,6 +396,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { pSPool->max = tsNumOfVnodeSyncThreads; if (tWWorkerInit(pSPool) != 0) return -1; + SWWorkerPool *pSCPool = &pMgmt->syncCtrlPool; + pSCPool->name = "vnode-sync-ctrl"; + pSCPool->max = tsNumOfVnodeSyncThreads; + if (tWWorkerInit(pSCPool) != 0) return -1; + SSingleWorkerCfg mgmtCfg = { .min = 1, .max = 1, @@ -398,6 +429,7 @@ void vmStopWorker(SVnodeMgmt *pMgmt) { tWWorkerCleanup(&pMgmt->writePool); tWWorkerCleanup(&pMgmt->applyPool); tWWorkerCleanup(&pMgmt->syncPool); + tWWorkerCleanup(&pMgmt->syncCtrlPool); tQWorkerCleanup(&pMgmt->queryPool); tQWorkerCleanup(&pMgmt->streamPool); tWWorkerCleanup(&pMgmt->fetchPool); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 54d5d9eec25e0a6fa8c6a80961c9a1ea6b5ee4e1..96ba9a06028b12c26df751a7ae1dc246148fe2e7 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -82,6 +82,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index ab7fb001f017e2f6917d1407aa102fbb86434b02..44eee79574fb7dd61c32807bbd70fb233e77c46d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -323,6 +323,49 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { } } +int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + int32_t code = 0; + const STraceId *trace = &pMsg->info.traceId; + + if (!syncEnvIsStart()) { + vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId); + terrno = TSDB_CODE_APP_ERROR; + return -1; + } + + SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); + if (pSyncNode == NULL) { + vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } + + vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); + + if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) { + SyncHeartbeat *pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg); + code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg); + syncHeartbeatDestroy(pSyncMsg); + + } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) { + SyncHeartbeatReply *pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg); + code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg); + syncHeartbeatReplyDestroy(pSyncMsg); + + } else { + vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType); + code = -1; + } + + vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType), + code); + syncNodeRelease(pSyncNode); + if (code != 0 && terrno == 0) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + } + return code; +} + int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t code = 0; const STraceId *trace = &pMsg->info.traceId; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 101e99afea8db60011fa535a53320a32c2dd376c..4e2a8647b2078eb75febaf04e7e46ea8183dbc8d 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -287,6 +287,12 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); +void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); +void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); + +void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); +void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); + // for debug -------------- void syncNodePrint(SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 6e4eaa0aaf71908eea798a6916dda5a5e4642ec2..8ccfce603d6aefec8ff0228a09b96902839e2aa8 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2282,7 +2282,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); - //ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted)); + // ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted)); syncNodeBecomeLeader(pSyncNode, "candidate to leader"); syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode); @@ -2676,7 +2676,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { if (ths->state == TAOS_SYNC_STATE_LEADER) { int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); if (code != 0) { - sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno)); + sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno)); return -1; } syncNodeReplicate(ths, false); @@ -2726,6 +2726,55 @@ int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { return ret; } +int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { + syncLogRecvHeartbeat(ths, pMsg, ""); + + SyncHeartbeatReply* pMsgReply = syncHeartbeatReplyBuild(ths->vgId); + pMsgReply->destId = pMsg->srcId; + pMsgReply->srcId = ths->myRaftId; + pMsgReply->term = ths->pRaftStore->currentTerm; + pMsgReply->privateTerm = 8864; // magic number + + SRpcMsg rpcMsg; + syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg); + + if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { + syncNodeBecomeFollower(ths, "become follower by hb"); + } + + if (pMsg->term == ths->pRaftStore->currentTerm) { + // sInfo("vgId:%d, heartbeat reset timer", ths->vgId); + syncNodeResetElectTimer(ths); + +#if 0 + if (ths->state == TAOS_SYNC_STATE_FOLLOWER) { + syncNodeFollowerCommit(ths, pMsg->commitIndex); + } +#endif + } + + /* + // htonl + SMsgHead* pHead = rpcMsg.pCont; + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + */ + + // reply + syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + + return 0; +} + +int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) { + syncLogRecvHeartbeatReply(ths, pMsg, ""); + + // update last reply time, make decision whether the other node is alive or not + syncIndexMgrSetRecvTime(ths->pMatchIndex, &(pMsg->destId), pMsg->startTime); + + return 0; +} + // TLA+ Spec // ClientRequest(i, v) == // /\ state[i] = Leader @@ -2754,7 +2803,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); if (code != 0) { // del resp mgr, call FpCommitCb - sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno)); + sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno)); return -1; } @@ -3073,9 +3122,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, } else { code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry); if (code != 0) { - sError("vgId:%d, failed to get log entry since %s. index:%lld", ths->vgId, tstrerror(terrno), i); - return -1; - } + sError("vgId:%d, failed to get log entry since %s. index:%lld", ths->vgId, tstrerror(terrno), i); + return -1; + } ASSERT(pEntry != NULL); } @@ -3337,3 +3386,45 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); syncNodeEventLog(pSyncNode, logBuf); } + +void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "send sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port, + pMsg->term, pMsg->commitIndex, pMsg->privateTerm, s); + syncNodeEventLog(pSyncNode, logBuf); +} + +void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port, + pMsg->term, pMsg->commitIndex, pMsg->privateTerm, s); + syncNodeEventLog(pSyncNode, logBuf); +} + +void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "send sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s", + host, port, pMsg->term, pMsg->privateTerm, s); + syncNodeEventLog(pSyncNode, logBuf); +} + +void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "recv sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s", + host, port, pMsg->term, pMsg->privateTerm, s); + syncNodeEventLog(pSyncNode, logBuf); +} \ No newline at end of file