diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index bf683fc9ac4f9f5c2d7b3ad82f6fe1f9a213432b..e8e931daa50292f333b3c56cff0983ed09bb3638 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -158,7 +158,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "mnode-drop-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "mnode-get-index", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL) // Requests handled by VNODE diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index ec4b4dd06d5621b4b47b66aff6fe5cdda15899a1..5b028d5055a2898b84533e00f44a49d54b2d3653 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -89,9 +89,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); * @return int32_t 0 for success, -1 for failure. */ int32_t mndProcessMsg(SRpcMsg *pMsg); - int32_t mndProcessSyncMsg(SRpcMsg *pMsg); - int32_t mndProcessApplyMsg(SRpcMsg *pMsg); /** diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 89d657d2d5eba1df82ddf4a9bdbb10d959befada..0f1d3252372e49bb46f731412fa64b903bf936e1 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -59,32 +59,17 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; pMsg->info.node = pMgmt->pMnode; - mndProcessSyncMsg(pMsg); - return; } static void mmProcessApplyQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; - int32_t code = -1; - tmsg_t msgType = pMsg->msgType; - bool isRequest = msgType & 1U; - dTrace("msg:%p, get from mnode-query queue", pMsg); + dTrace("msg:%p, get from mnode-apply queue", pMsg); pMsg->info.node = pMgmt->pMnode; - mndProcessApplyMsg(pMsg); - /* - if (isRequest) { - if (pMsg->info.handle != NULL && code != 0) { - if (code != 0 && terrno != 0) code = terrno; - mmSendRsp(pMsg, code); - } - } - */ - - dTrace("msg:%p, is freed, code:0x%x", pMsg, code); + dTrace("msg:%p, is freed", pMsg); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 0982096d254b12c39a68c7d547774502af850622..16945b1403f94b09e71ec6af421376eab3fd87c7 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -19,6 +19,7 @@ #include "mndDef.h" #include "sdb.h" +#include "syncTools.h" #include "tcache.h" #include "tdatablock.h" #include "tglobal.h" @@ -31,12 +32,14 @@ extern "C" { #endif +// clang-format off #define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} #define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} #define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} #define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }} #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} +// clang-format on #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) @@ -75,9 +78,7 @@ typedef struct { int32_t errCode; sem_t syncSem; SWal *pWal; - //SSyncNode *pSyncNode; - int64_t sync; - + int64_t sync; ESyncState state; } SSyncMgmt; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index e7144f5673ef46fe0c9c08cb8c5c0274c07e3578..d8eb021462565ad09a7f0211ad92ba7b2bc24827 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -125,24 +125,11 @@ _OVER: } -int32_t mndSyncEqMsg(const SMsgCb* msgcb, SRpcMsg *pMsg) { - int32_t ret = 0; - if (msgcb->queueFps[SYNC_QUEUE] != NULL) { - tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); - } else { - mError("mndSyncEqMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE); - } - return ret; -} +int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } -int32_t mndSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { - int32_t ret = 0; - pMsg->info.noResp = 1; - tmsgSendReq(pEpSet, pMsg); - return ret; -} +int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } -void mndSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { SyncIndex beginIndex = SYNC_INDEX_INVALID; if (pFsm->FpGetSnapshot != NULL) { SSnapshot snapshot; @@ -163,26 +150,26 @@ void mndSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMe } } -void mndSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +void mndSyncPreCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { // strict consistent, do nothing } -void mndSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +void mndSyncRollBackMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { // strict consistent, do nothing } -int32_t mndSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { +int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { // snapshot return 0; } -SSyncFSM *syncMnodeMakeFsm(SMnode *pMnode) { - SSyncFSM *pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM)); +SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { + SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pMnode; - pFsm->FpCommitCb = mndSyncCommitCb; - pFsm->FpPreCommitCb = mndSyncPreCommitCb; - pFsm->FpRollBackCb = mndSyncRollBackCb; - pFsm->FpGetSnapshot = mndSyncGetSnapshotCb; + pFsm->FpCommitCb = mndSyncCommitMsg; + pFsm->FpPreCommitCb = mndSyncPreCommitMsg; + pFsm->FpRollBackCb = mndSyncRollBackMsg; + pFsm->FpGetSnapshot = mndSyncGetSnapshot; return pFsm; } @@ -195,18 +182,11 @@ int32_t mndInitSync(SMnode *pMnode) { return -1; } - if (mndRestoreWal(pMnode) < 0) { - mError("failed to restore wal since %s", terrstr()); - return -1; - } - if (pMnode->selfId == 1) { pMgmt->state = TAOS_SYNC_STATE_LEADER; } - - // pMgmt->pSyncNode = NULL; - SSyncInfo syncInfo; - syncInfo.vgId = 1; + + SSyncInfo syncInfo = {.vgId = 1}; SSyncCfg *pCfg = &(syncInfo.syncCfg); pCfg->replicaNum = pMnode->replica; pCfg->myIndex = pMnode->selfIndex; @@ -216,9 +196,8 @@ int32_t mndInitSync(SMnode *pMnode) { } snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", pMnode->path); syncInfo.pWal = pMnode->syncMgmt.pWal; - - syncInfo.pFsm = syncMnodeMakeFsm(pMnode); - syncInfo.FpSendMsg = mndSendMsg; + syncInfo.pFsm = mndSyncMakeFsm(pMnode); + syncInfo.FpSendMsg = mndSyncSendMsg; syncInfo.FpEqMsg = mndSyncEqMsg; pMnode->syncMgmt.sync = syncOpen(&syncInfo); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 02f4e0872c780223564309fd5cacd293b22a9801..297174beb7b9ab1535d301e6e12f7ee7f4cd2bb0 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -343,25 +343,18 @@ int32_t mndStart(SMnode *pMnode) { void mndStop(SMnode *pMnode) { syncStop(pMnode->syncMgmt.sync); - return mndCleanupTimer(pMnode); + return mndCleanupTimer(pMnode); } int32_t mndProcessApplyMsg(SRpcMsg *pMsg) { - SSdbRaw *pRaw = pMsg->pCont; - SMnode *pMnode = pMsg->info.node; - int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pRaw); - rpcFreeCont(pMsg->pCont); - - return code; + SMnode *pMnode = pMsg->info.node; + return sdbWriteWithoutFree(pMnode->pSdb, pRaw); } -#include "syncTools.h" - int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; void *ahandle = pMsg->info.ahandle; - int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; if (syncEnvIsStart()) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 9c02b91ef0043eb899493e1b9d4706eff430bb55..f4f09d0f7b3cbedff88c0c97ad5d3d4b68013c0c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include #include "sync.h" #include "syncAppendEntries.h" #include "syncAppendEntriesReply.h"