diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 455898585aaec2935d72aab0cdf6dfab6a0aac48..bf683fc9ac4f9f5c2d7b3ad82f6fe1f9a213432b 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -158,6 +158,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "mnode-drop-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "mnode-get-index", NULL, NULL) + + TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL) // Requests handled by VNODE TD_NEW_MSG_SEG(TDMT_VND_MSG) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 28c470a4437d46cdaf213d857e51c6e67108d1d4..16cb4cdd7012aac6145b6c5d0c7319efdfeb60ec 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -90,6 +90,9 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); */ int32_t mndProcessMsg(SRpcMsg *pMsg); + +int32_t mndProcessApplyMsg(SRpcMsg *pMsg); + /** * @brief Generate machine code */ diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 75e83d65471fdebfba4fdbfa3083a2dc02f7fd22..380ae63b8d3cb490576e7b05497df0b38f1695b9 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -33,6 +33,7 @@ typedef struct SMnodeMgmt { SSingleWorker readWorker; SSingleWorker writeWorker; SSingleWorker syncWorker; + SSingleWorker applyWorker; SSingleWorker monitorWorker; SReplica replicas[TSDB_MAX_REPLICA]; int8_t replica; @@ -59,6 +60,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt); void mmStopWorker(SMnodeMgmt *pMgmt); int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutNodeMsgToApplyQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 4f7fd4a1c0c925093b3773e06b9dfba1718ce945..68b2889278a96348d3c9395e4488f568edfd53a7 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -122,6 +122,11 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { return -1; } + if (syncInit() != 0) { + dError("failed to init sync since %s", terrstr()); + return -1; + } + SMnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SMnodeMgmt)); if (pMgmt == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index c4314a57b15ebc18df872261296911cc62ed07bc..04fe891ce9a9001062569a7d32f1ad8f2f53d8f9 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -56,6 +56,32 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } + +static void mmProcessApplyQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { + SMnodeMgmt *pMgmt = pInfo->ahandle; + int32_t code = -1; + tmsg_t msgType = pMsg->msgType; + bool isRequest = msgType & 1U; + dTrace("msg:%p, get from mnode-query queue", pMsg); + + pMsg->info.node = pMgmt->pMnode; + + mndProcessApplyMsg(pMsg); + + /* + if (isRequest) { + if (pMsg->info.handle != NULL && code != 0) { + if (code != 0 && terrno != 0) code = terrno; + mmSendRsp(pMsg, code); + } + } + */ + + dTrace("msg:%p, is freed, code:0x%x", pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); +} + static void mmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; @@ -92,6 +118,10 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); } +int32_t mmPutNodeMsgToApplyQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutNodeMsgToWorker(&pMgmt->applyWorker, pMsg); +} + int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); } @@ -179,6 +209,18 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { return -1; } + SSingleWorkerCfg aCfg = { + .min = 1, + .max = 1, + .name = "mnode-apply", + .fp = (FItem)mmProcessApplyQueue, + .param = pMgmt, + }; + if (tSingleWorkerInit(&pMgmt->applyWorker, &aCfg) != 0) { + dError("failed to start mnode mnode-apply worker since %s", terrstr()); + return -1; + } + SSingleWorkerCfg mCfg = { .min = 1, .max = 1, diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 5258fa9e023a49e3fdf4ea41b2785d3ed93a27a8..0982096d254b12c39a68c7d547774502af850622 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -75,7 +75,9 @@ typedef struct { int32_t errCode; sem_t syncSem; SWal *pWal; - SSyncNode *pSyncNode; + //SSyncNode *pSyncNode; + int64_t sync; + ESyncState state; } SSyncMgmt; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 3dbe3241a78f617a99148f8c571189eea41e17b5..e7144f5673ef46fe0c9c08cb8c5c0274c07e3578 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -46,6 +46,12 @@ static void mndCloseWal(SMnode *pMnode) { } static int32_t mndRestoreWal(SMnode *pMnode) { + +// do nothing +return 0; + +#if 0 + SWal *pWal = pMnode->syncMgmt.pWal; SSdb *pSdb = pMnode->pSdb; int64_t lastSdbVer = sdbUpdateVer(pSdb, 0); @@ -114,6 +120,70 @@ static int32_t mndRestoreWal(SMnode *pMnode) { _OVER: walCloseReadHandle(pHandle); return code; + +#endif + +} + +int32_t mndSyncEqMsg(const SMsgCb* msgcb, SRpcMsg *pMsg) { + int32_t ret = 0; + if (msgcb->queueFps[SYNC_QUEUE] != NULL) { + tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); + } else { + mError("mndSyncEqMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE); + } + return ret; +} + +int32_t mndSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { + int32_t ret = 0; + pMsg->info.noResp = 1; + tmsgSendReq(pEpSet, pMsg); + return ret; +} + +void mndSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { + SyncIndex beginIndex = SYNC_INDEX_INVALID; + if (pFsm->FpGetSnapshot != NULL) { + SSnapshot snapshot; + pFsm->FpGetSnapshot(pFsm, &snapshot); + beginIndex = snapshot.lastApplyIndex; + } + + if (cbMeta.index > beginIndex) { + SMnode *pMnode = pFsm->data; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + + mndProcessApplyMsg((SRpcMsg*)pMsg); + //mmPutNodeMsgToApplyQueue(pMnode->pWrapper->pMgmt, pMsg); + + if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { + tsem_post(&pMgmt->syncSem); + } + } +} + +void mndSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { + // strict consistent, do nothing +} + +void mndSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { + // strict consistent, do nothing +} + +int32_t mndSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { + // snapshot + return 0; +} + +SSyncFSM *syncMnodeMakeFsm(SMnode *pMnode) { + SSyncFSM *pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM)); + pFsm->data = pMnode; + pFsm->FpCommitCb = mndSyncCommitCb; + pFsm->FpPreCommitCb = mndSyncPreCommitCb; + pFsm->FpRollBackCb = mndSyncRollBackCb; + pFsm->FpGetSnapshot = mndSyncGetSnapshotCb; + return pFsm; } int32_t mndInitSync(SMnode *pMnode) { @@ -133,7 +203,27 @@ int32_t mndInitSync(SMnode *pMnode) { if (pMnode->selfId == 1) { pMgmt->state = TAOS_SYNC_STATE_LEADER; } - pMgmt->pSyncNode = NULL; + + // pMgmt->pSyncNode = NULL; + SSyncInfo syncInfo; + syncInfo.vgId = 1; + SSyncCfg *pCfg = &(syncInfo.syncCfg); + pCfg->replicaNum = pMnode->replica; + pCfg->myIndex = pMnode->selfIndex; + for (int i = 0; i < pMnode->replica; ++i) { + snprintf((pCfg->nodeInfo)->nodeFqdn, sizeof((pCfg->nodeInfo)->nodeFqdn), "%s", (pMnode->replicas)[i].fqdn); + (pCfg->nodeInfo)->nodePort = (pMnode->replicas)[i].port; + } + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", pMnode->path); + syncInfo.pWal = pMnode->syncMgmt.pWal; + + syncInfo.pFsm = syncMnodeMakeFsm(pMnode); + syncInfo.FpSendMsg = mndSendMsg; + syncInfo.FpEqMsg = mndSyncEqMsg; + + pMnode->syncMgmt.sync = syncOpen(&syncInfo); + ASSERT(pMnode->syncMgmt.sync > 0); + return 0; } @@ -157,6 +247,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { SWal *pWal = pMnode->syncMgmt.pWal; SSdb *pSdb = pMnode->pSdb; +#if 0 int64_t ver = sdbUpdateVer(pSdb, 1); if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) { sdbUpdateVer(pSdb, -1); @@ -168,24 +259,32 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { walCommit(pWal, ver); walFsync(pWal, true); -#if 1 return 0; + #else + if (pMnode->replica == 1) return 0; SSyncMgmt *pMgmt = &pMnode->syncMgmt; pMgmt->errCode = 0; - SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)}; + //SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)}; + + SRpcMsg rpcMsg; + rpcMsg.code = TDMT_MND_APPLY_MSG; + rpcMsg.contLen = sdbGetRawTotalSize(pRaw); + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + memcpy(rpcMsg.pCont, pRaw, rpcMsg.contLen); bool isWeak = false; - int32_t code = syncPropose(pMgmt->pSyncNode, &buf, pMnode, isWeak); + int32_t code = syncPropose(pMgmt->sync, &rpcMsg, isWeak); if (code != 0) return code; tsem_wait(&pMgmt->syncSem); return pMgmt->errCode; #endif + } bool mndIsMaster(SMnode *pMnode) { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 35ecaa748ecdbdce486814f063b7678004f68909..f4fa4beaf17a8b8303915a2fa0101c47cbde9076 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -683,11 +683,14 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { mDebug("trans:%d, sync finished", pTrans->id); - code = sdbWrite(pMnode->pSdb, pRaw); +// do it in state machine commit cb +#if 0 + code = sdbWriteWithout(pMnode->pSdb, pRaw); if (code != 0) { mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); return -1; } +#endif return 0; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 8c805dd8c705d41f62a5728c8bf978d0f30924d5..f8e5a65f0f9983f44232451e8874fff7fc9ab1b3 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -336,9 +336,25 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { return 0; } -int32_t mndStart(SMnode *pMnode) { return mndInitTimer(pMnode); } +int32_t mndStart(SMnode *pMnode) { + syncStart(pMnode->syncMgmt.sync); + return mndInitTimer(pMnode); +} + +void mndStop(SMnode *pMnode) { + syncStop(pMnode->syncMgmt.sync); + return mndCleanupTimer(pMnode); +} + +int32_t mndProcessApplyMsg(SRpcMsg *pMsg) { -void mndStop(SMnode *pMnode) { return mndCleanupTimer(pMnode); } + SSdbRaw *pRaw = pMsg->pCont; + SMnode *pMnode = pMsg->info.node; + int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pRaw); + rpcFreeCont(pMsg->pCont); + + return code; +} int32_t mndProcessMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d9ff60bbe22b573db34331e5aabbd04b06ff5616..9c02b91ef0043eb899493e1b9d4706eff430bb55 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -55,14 +55,17 @@ static void syncFreeNode(void* param); // --------------------------------- int32_t syncInit() { - int32_t ret; - tsNodeRefId = taosOpenRef(200, syncFreeNode); - if (tsNodeRefId < 0) { - sError("failed to init node ref"); - syncCleanUp(); - ret = -1; - } else { - ret = syncEnvStart(); + int32_t ret = 0; + + if (!syncEnvIsStart()) { + tsNodeRefId = taosOpenRef(200, syncFreeNode); + if (tsNodeRefId < 0) { + sError("failed to init node ref"); + syncCleanUp(); + ret = -1; + } else { + ret = syncEnvStart(); + } } return ret;