diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index c13c50e1616146e43d3a74dc4145a5e7c7ffccf3..b5b997dac0a18c6ca83e07d5f5c597004df8a913 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -34,6 +34,7 @@ typedef enum { WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, + SYNC_CTRL_QUEUE, STREAM_QUEUE, QUEUE_MAX, } EQueueType; diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 58c9b308901f6e9ed1bc8e871dd25997efe2fd15..382c0e6218027808ae15bd73283052d0cbba5d39 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -207,6 +207,7 @@ typedef struct SSyncInfo { SMsgCb* msgcb; int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); + int32_t (*FpEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); } SSyncInfo; int32_t syncInit(); diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index de2271554d2b9166ec240ea07e91dea9d017ef92..b2c743831a225a6c1ae1f9363ca0f93b014ddfb6 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -695,6 +695,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie int32_t syncNodeOnSnapshotSendCb(SSyncNode* ths, SyncSnapshotSend* pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg); +int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg); +int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg); + // ----------------------------------------- typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg); typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 30f54831987999daccac93f5b6cbd0ccc2dfbf28..32649464c1561011695510ed1094bb0b5cf96091 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -34,6 +34,7 @@ typedef struct SVnodeMgmt { SQWorkerPool streamPool; SWWorkerPool fetchPool; SWWorkerPool syncPool; + SWWorkerPool syncCtrlPool; SWWorkerPool writePool; SWWorkerPool applyPool; SSingleWorker mgmtWorker; @@ -60,6 +61,7 @@ typedef struct { SVnode *pImpl; STaosQueue *pWriteQ; STaosQueue *pSyncQ; + STaosQueue *pSyncCtrlQ; STaosQueue *pApplyQ; STaosQueue *pQueryQ; STaosQueue *pStreamQ; @@ -106,6 +108,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc); int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d8456ad6264c5c784be91afdcd8c58b832a99ee8..ee570aeca28e84423f5ae79da50a252087911c57 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -423,6 +423,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index d7df30bc7587cada18a308176e2fc739c1679afa..ae763efdb4e985d55f9c66a60224745b040596ca 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -136,6 +136,22 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf } } +static void vmProcessSyncCtrlQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnodeObj *pVnode = pInfo->ahandle; + SRpcMsg *pMsg = NULL; + + for (int32_t i = 0; i < numOfMsgs; ++i) { + if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; + const STraceId *trace = &pMsg->info.traceId; + dGTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg); + + int32_t code = vnodeProcessSyncCtrlMsg(pVnode->pImpl, pMsg, NULL); // no response here + dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } +} + static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) { const STraceId *trace = &pMsg->info.traceId; SMsgHead *pHead = pMsg->pCont; @@ -217,6 +233,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp return code; } +int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_CTRL_QUEUE); } + int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); } int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); } @@ -301,6 +319,7 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeWriteMsg); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); + pVnode->pSyncCtrlQ = tWWorkerAllocQueue(&pMgmt->syncCtrlPool, pVnode, (FItems)vmProcessSyncCtrlQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pStreamQ = tQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue); @@ -325,6 +344,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ); tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); + tWWorkerFreeQueue(&pMgmt->syncCtrlPool, pVnode->pSyncCtrlQ); tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ); tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); @@ -370,6 +390,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { pSPool->max = tsNumOfVnodeSyncThreads; if (tWWorkerInit(pSPool) != 0) return -1; + SWWorkerPool *pSCPool = &pMgmt->syncCtrlPool; + pSCPool->name = "vnode-sync-ctrl"; + pSCPool->max = tsNumOfVnodeSyncThreads; + if (tWWorkerInit(pSCPool) != 0) return -1; + SSingleWorkerCfg mgmtCfg = { .min = 1, .max = 1, @@ -398,6 +423,7 @@ void vmStopWorker(SVnodeMgmt *pMgmt) { tWWorkerCleanup(&pMgmt->writePool); tWWorkerCleanup(&pMgmt->applyPool); tWWorkerCleanup(&pMgmt->syncPool); + tWWorkerCleanup(&pMgmt->syncCtrlPool); tQWorkerCleanup(&pMgmt->queryPool); tQWorkerCleanup(&pMgmt->streamPool); tWWorkerCleanup(&pMgmt->fetchPool); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index d006f12a3f393b7ed562de56a9b14072fa81f260..16e9723e8add5813a3753ec0a1fc77e749084fe3 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -82,6 +82,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 353c3874c00173ad1628cdf28747e08382cc8152..e89d52c99c2c5a8c0fd60b71de5e0e106b07a6dd 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -323,6 +323,49 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { } } +int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + int32_t code = 0; + const STraceId *trace = &pMsg->info.traceId; + + if (!syncEnvIsStart()) { + vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId); + terrno = TSDB_CODE_APP_ERROR; + return -1; + } + + SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); + if (pSyncNode == NULL) { + vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } + + vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); + + if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) { + SyncHeartbeat *pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg); + code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg); + syncHeartbeatDestroy(pSyncMsg); + + } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) { + SyncHeartbeatReply *pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg); + code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg); + syncHeartbeatReplyDestroy(pSyncMsg); + + } else { + vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType); + code = -1; + } + + vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType), + code); + syncNodeRelease(pSyncNode); + if (code != 0 && terrno == 0) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + } + return code; +} + int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t code = 0; const STraceId *trace = &pMsg->info.traceId; @@ -473,6 +516,19 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { return code; } +static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + if (msgcb == NULL) { + return -1; + } + + int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg); + if (code != 0) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + } + return code; +} + static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { if (msgcb == NULL) { return -1; @@ -758,6 +814,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { .msgcb = NULL, .FpSendMsg = vnodeSyncSendMsg, .FpEqMsg = vnodeSyncEqMsg, + .FpEqCtrlMsg = vnodeSyncEqCtrlMsg, }; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 0afc373f2d0d90333002245e43817d12ef02df8c..e699079a3f6ad09f20f26d6bd6d784f0c5f120ff 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -73,6 +73,7 @@ typedef struct SSyncNode { const SMsgCb* msgcb; int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); + int32_t (*FpEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); // init internal SNodeInfo myNodeInfo; @@ -287,6 +288,12 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); +void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); +void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); + +void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); +void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); + // for debug -------------- void syncNodePrint(SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 76fd345bdd1226a960c13c94de9d86ea7d954f1e..13cf5b4995d7851f78132e7445dd58173d9c2111 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -974,6 +974,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { pSyncNode->msgcb = pSyncInfo->msgcb; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg; + pSyncNode->FpEqCtrlMsg = pSyncInfo->FpEqCtrlMsg; // init raft config pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); @@ -2698,6 +2699,45 @@ int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { return ret; } +int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { + syncLogRecvHeartbeat(ths, pMsg, ""); + + SyncHeartbeatReply* pMsgReply = syncHeartbeatReplyBuild(ths->vgId); + pMsgReply->destId = pMsg->srcId; + pMsgReply->srcId = ths->myRaftId; + pMsgReply->term = ths->pRaftStore->currentTerm; + pMsgReply->privateTerm = 8864; // magic number + + SRpcMsg rpcMsg; + syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg); + + if (pMsg->term == ths->pRaftStore->currentTerm) { + sInfo("vgId:%d, heartbeat reset timer", 1); + syncNodeResetElectTimer(ths); + } + + /* + // htonl + SMsgHead* pHead = rpcMsg.pCont; + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + */ + + // reply + syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + + return 0; +} + +int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) { + syncLogRecvHeartbeatReply(ths, pMsg, ""); + + // update last reply time, make decision whether the other node is alive or not + syncIndexMgrSetRecvTime(ths->pMatchIndex, &(pMsg->destId), pMsg->startTime); + + return 0; +} + // TLA+ Spec // ClientRequest(i, v) == // /\ state[i] = Leader @@ -3304,3 +3344,45 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); syncNodeEventLog(pSyncNode, logBuf); } + +void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "send sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port, + pMsg->term, pMsg->commitIndex, pMsg->privateTerm, s); + syncNodeEventLog(pSyncNode, logBuf); +} + +void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port, + pMsg->term, pMsg->commitIndex, pMsg->privateTerm, s); + syncNodeEventLog(pSyncNode, logBuf); +} + +void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "send sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s", + host, port, pMsg->term, pMsg->privateTerm, s); + syncNodeEventLog(pSyncNode, logBuf); +} + +void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "recv sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s", + host, port, pMsg->term, pMsg->privateTerm, s); + syncNodeEventLog(pSyncNode, logBuf); +} \ No newline at end of file