From 5745223bca7b7c8218edb581f7d492efc0687040 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 17 Oct 2022 10:33:27 +0800 Subject: [PATCH] refactor(sync): add double queues in mnode --- include/dnode/mnode/mnode.h | 1 + source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 2 ++ source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 3 ++ source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 38 ++++++++++++++++++++ source/dnode/mnode/impl/src/mndMain.c | 39 +++++++++++++++++++++ source/libs/sync/inc/syncSnapshot.h | 20 ++++++----- source/libs/sync/src/syncMain.c | 24 ++++++++----- source/libs/sync/src/syncReplication.c | 4 +++ source/libs/sync/src/syncSnapshot.c | 12 ++++--- tests/script/tsim/sync/sync2-test.sim | 9 +++++ 10 files changed, 130 insertions(+), 22 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 0d43539629..a58f790fa3 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -98,6 +98,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); */ int32_t mndProcessRpcMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg); +int32_t mndProcessSyncCtrlMsg(SRpcMsg *pMsg); int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg); void mndPostProcessQueryMsg(SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index a4c37dd334..d329b6b732 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -34,6 +34,7 @@ typedef struct SMnodeMgmt { SSingleWorker readWorker; SSingleWorker writeWorker; SSingleWorker syncWorker; + SSingleWorker syncCtrlWorker; SSingleWorker monitorWorker; bool stopped; int32_t refCount; @@ -56,6 +57,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt); void mmStopWorker(SMnodeMgmt *pMgmt); int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutMsgToSyncCtrlQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index ec761e6441..d930c1866e 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -257,6 +257,9 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_MNODE_STANDBY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; + code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index d2b071ec61..ebaa0e4d85 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -77,6 +77,24 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } +static void mmProcessSyncCtrlMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { + SMnodeMgmt *pMgmt = pInfo->ahandle; + pMsg->info.node = pMgmt->pMnode; + + const STraceId *trace = &pMsg->info.traceId; + dGTrace("msg:%p, get from mnode-sync-ctrl queue", pMsg); + + SMsgHead *pHead = pMsg->pCont; + pHead->contLen = ntohl(pHead->contLen); + pHead->vgId = ntohl(pHead->vgId); + + int32_t code = mndProcessSyncCtrlMsg(pMsg); + + dGTrace("msg:%p, is freed, code:0x%x", pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); +} + static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; pMsg->info.node = pMgmt->pMnode; @@ -118,6 +136,10 @@ int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg); } +int32_t mmPutMsgToSyncCtrlQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutMsgToWorker(pMgmt, &pMgmt->syncCtrlWorker, pMsg); +} + int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); } @@ -158,6 +180,9 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { case SYNC_QUEUE: pWorker = &pMgmt->syncWorker; break; + case SYNC_CTRL_QUEUE: + pWorker = &pMgmt->syncCtrlWorker; + break; default: terrno = TSDB_CODE_INVALID_PARA; } @@ -237,6 +262,18 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { return -1; } + SSingleWorkerCfg scCfg = { + .min = 1, + .max = 1, + .name = "mnode-sync-ctrl", + .fp = (FItem)mmProcessSyncCtrlMsg, + .param = pMgmt, + }; + if (tSingleWorkerInit(&pMgmt->syncCtrlWorker, &scCfg) != 0) { + dError("failed to start mnode mnode-sync-ctrl worker since %s", terrstr()); + return -1; + } + SSingleWorkerCfg mCfg = { .min = 1, .max = 1, @@ -262,5 +299,6 @@ void mmStopWorker(SMnodeMgmt *pMgmt) { tSingleWorkerCleanup(&pMgmt->readWorker); tSingleWorkerCleanup(&pMgmt->writeWorker); tSingleWorkerCleanup(&pMgmt->syncWorker); + tSingleWorkerCleanup(&pMgmt->syncCtrlWorker); dDebug("mnode workers are closed"); } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 275fc76e36..ca6ddc0c0d 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -444,6 +444,45 @@ void mndStop(SMnode *pMnode) { mndCleanupTimer(pMnode); } +int32_t mndProcessSyncCtrlMsg(SRpcMsg *pMsg) { + SMnode *pMnode = pMsg->info.node; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + int32_t code = 0; + + mInfo("vgId:%d, process sync ctrl msg", 1); + + if (!syncEnvIsStart()) { + mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } + + SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync); + if (pSyncNode == NULL) { + mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType)); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } + + if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) { + SyncHeartbeat *pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg); + code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg); + syncHeartbeatDestroy(pSyncMsg); + + } else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) { + SyncHeartbeatReply *pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg); + code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg); + syncHeartbeatReplyDestroy(pSyncMsg); + } + + syncNodeRelease(pSyncNode); + + if (code != 0) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + } + return code; +} + int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SSyncMgmt *pMgmt = &pMnode->syncMgmt; diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 9ae260a308..ed5fc553d6 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -40,14 +40,14 @@ typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void * pReader; - void * pCurrentBlock; + void *pReader; + void *pCurrentBlock; int32_t blockLen; SSnapshotParam snapshotParam; SSnapshot snapshot; SSyncCfg lastConfig; int64_t sendingMS; - SSyncNode * pSyncNode; + SSyncNode *pSyncNode; int32_t replicaIndex; SyncTerm term; SyncTerm privateTerm; @@ -64,20 +64,22 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); -char * snapshotSender2Str(SSyncSnapshotSender *pSender); -char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); +char *snapshotSender2Str(SSyncSnapshotSender *pSender); +char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); + +int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId); //--------------------------------------------------- typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void * pWriter; + void *pWriter; SyncTerm term; SyncTerm privateTerm; SSnapshotParam snapshotParam; SSnapshot snapshot; SRaftId fromId; - SSyncNode * pSyncNode; + SSyncNode *pSyncNode; } SSyncSnapshotReceiver; @@ -89,8 +91,8 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); -char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); +char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); //--------------------------------------------------- // on message diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 12aedbe5d1..bc791af0bb 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1493,15 +1493,19 @@ static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) { } int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { - pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine; - int32_t ret = syncNodeDoStartHeartbeatTimer(pSyncNode); + int32_t ret = 0; - do { - for (int i = 0; i < pSyncNode->peersNum; ++i) { - SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i])); - syncHbTimerStart(pSyncNode, pSyncTimer); - } - } while (0); + if (syncNodeIsMnode(pSyncNode)) { + pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine; + ret = syncNodeDoStartHeartbeatTimer(pSyncNode); + } else { + do { + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i])); + syncHbTimerStart(pSyncNode, pSyncTimer); + } + } while (0); + } return ret; } @@ -2954,6 +2958,8 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI } int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) { + syncNodeEventLog(ths, "on client request"); + int32_t ret = 0; int32_t code = 0; @@ -3003,6 +3009,8 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd } int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* pMsg) { + syncNodeEventLog(ths, "on client request batch"); + int32_t code = 0; if (ths->state != TAOS_SYNC_STATE_LEADER) { diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index fc293b9149..2100c795e6 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -487,6 +487,8 @@ int32_t syncNodeDoAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId) { SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore); if (nextIndex < logStartIndex || nextIndex > logEndIndex) { // start snapshot + int32_t code = syncNodeStartSnapshot(pSyncNode, pDestId); + ASSERT(code == 0); return 0; } @@ -546,6 +548,8 @@ int32_t syncNodeDoAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId) { } int32_t syncNodeDoReplicate(SSyncNode* pSyncNode) { + syncNodeEventLog(pSyncNode, "do replicate"); + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { return -1; } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 2014449faf..a1bf447cf9 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -376,14 +376,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { int32_t len = 256; - char * s = taosMemoryMalloc(len); + char *s = taosMemoryMalloc(len); SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; char host[64]; @@ -402,6 +402,8 @@ char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { return s; } +int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { return 0; } + // ------------------------------------- SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) { bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) && @@ -655,7 +657,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddStringToObject(pFromId, "addr", u64buf); { uint64_t u64 = pReceiver->fromId.addr; - cJSON * pTmp = pFromId; + cJSON *pTmp = pFromId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); @@ -688,14 +690,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { int32_t len = 256; - char * s = taosMemoryMalloc(len); + char *s = taosMemoryMalloc(len); SRaftId fromId = pReceiver->fromId; char host[128]; diff --git a/tests/script/tsim/sync/sync2-test.sim b/tests/script/tsim/sync/sync2-test.sim index 0419381d3c..8d147086a7 100644 --- a/tests/script/tsim/sync/sync2-test.sim +++ b/tests/script/tsim/sync/sync2-test.sim @@ -111,6 +111,9 @@ else endi + +return 0 + vg_ready: print ====> create stable/child table sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) @@ -123,6 +126,12 @@ endi sql create table ct1 using stb tags(1000) + + + + + + print ====> step1 insert 1000 records $N = 1000 $count = 0 -- GitLab