diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index f4a6f7a4c93f778ce738ed0cf9ceae63bcb4b6e6..09d1f8c0139102913df52605fbe19933bf21b73c 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -27,7 +27,6 @@ typedef struct SMnodeMsg SMnodeMsg; typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); -typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnode, SMnodeMsg *pMsg); typedef struct SMnodeLoad { int64_t numOfDnode; @@ -63,7 +62,6 @@ typedef struct { SReplica replicas[TSDB_MAX_REPLICA]; SMnodeCfg cfg; SDnode *pDnode; - PutMsgToMnodeQFp putMsgToApplyMsgFp; SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp; SendRedirectMsgFp sendRedirectMsgFp; @@ -172,14 +170,6 @@ void mndProcessWriteMsg(SMnodeMsg *pMsg); */ void mndProcessSyncMsg(SMnodeMsg *pMsg); -/** - * @brief Process the apply request. - * - * @param pMsg The request msg. - * @return int32_t 0 for success, -1 for failure. - */ -void mndProcessApplyMsg(SMnodeMsg *pMsg); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 8973574d23fdfff01499f6fcf7e51a4312956c5f..6f1357e9c1c843fc46eff534cf57b1f265a5ee0e 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -80,7 +80,6 @@ typedef struct { SRWLatch latch; taos_queue pReadQ; taos_queue pWriteQ; - taos_queue pApplyQ; taos_queue pSyncQ; taos_queue pMgmtQ; SWorkerPool mgmtPool; diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 6b5aeb078a0df484fedab479ac1b44f6435886fe..af86e5951811a774657cfe991015ad8bc9fa75e8 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -369,7 +369,7 @@ void dndSendStatusMsg(SDnode *pDnode) { dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads); contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad); - SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = 9527}; + SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; pMgmt->statusSent = 1; dTrace("pDnode:%p, send status msg to mnode", pDnode); diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index c62c05fe2faadf177e7186b18653d8d2e589d559..8fbb473af10a04091d930af844c329fd0e3bfb3c 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -28,18 +28,15 @@ static void dndCleanupMnodeSyncWorker(SDnode *pDnode); static void dndCleanupMnodeMgmtWorker(SDnode *pDnode); static int32_t dndAllocMnodeReadQueue(SDnode *pDnode); static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode); -static int32_t dndAllocMnodeApplyQueue(SDnode *pDnode); static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode); static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode); static void dndFreeMnodeReadQueue(SDnode *pDnode); static void dndFreeMnodeWriteQueue(SDnode *pDnode); -static void dndFreeMnodeApplyQueue(SDnode *pDnode); static void dndFreeMnodeSyncQueue(SDnode *pDnode); static void dndFreeMnodeMgmtQueue(SDnode *pDnode); static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg); -static void dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg); static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg); @@ -47,7 +44,6 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEp void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -static int32_t dndPutMsgIntoMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg); static int32_t dndStartMnodeWorker(SDnode *pDnode); static void dndStopMnodeWorker(SDnode *pDnode); @@ -271,11 +267,6 @@ static int32_t dndStartMnodeWorker(SDnode *pDnode) { return -1; } - if (dndAllocMnodeApplyQueue(pDnode) != 0) { - dError("failed to alloc mnode apply queue since %s", terrstr()); - return -1; - } - if (dndAllocMnodeSyncQueue(pDnode) != 0) { dError("failed to alloc mnode sync queue since %s", terrstr()); return -1; @@ -293,7 +284,6 @@ static void dndStopMnodeWorker(SDnode *pDnode) { while (pMgmt->refCount > 1) taosMsleep(10); while (!taosQueueEmpty(pMgmt->pReadQ)) taosMsleep(10); - while (!taosQueueEmpty(pMgmt->pApplyQ)) taosMsleep(10); while (!taosQueueEmpty(pMgmt->pWriteQ)) taosMsleep(10); while (!taosQueueEmpty(pMgmt->pSyncQ)) taosMsleep(10); @@ -303,7 +293,6 @@ static void dndStopMnodeWorker(SDnode *pDnode) { dndFreeMnodeReadQueue(pDnode); dndFreeMnodeWriteQueue(pDnode); - dndFreeMnodeApplyQueue(pDnode); dndFreeMnodeSyncQueue(pDnode); } @@ -328,7 +317,6 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) { pOption->sendMsgToDnodeFp = dndSendMsgToDnode; pOption->sendMsgToMnodeFp = dndSendMsgToMnode; pOption->sendRedirectMsgFp = dndSendRedirectMsg; - pOption->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue; pOption->dnodeId = dndGetDnodeId(pDnode); pOption->clusterId = dndGetClusterId(pDnode); pOption->cfg.sver = pDnode->opt.sver; @@ -604,20 +592,6 @@ static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) { mndCleanupMsg(pMsg); } -static void dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - - SMnode *pMnode = dndAcquireMnode(pDnode); - if (pMnode != NULL) { - mndProcessApplyMsg(pMsg); - dndReleaseMnode(pDnode, pMnode); - } else { - mndSendRsp(pMsg, terrno); - } - - mndCleanupMsg(pMsg); -} - static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; @@ -712,19 +686,6 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { dndReleaseMnode(pDnode, pMnode); } -static int32_t dndPutMsgIntoMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - - SMnode *pMnode = dndAcquireMnode(pDnode); - if (pMnode == NULL) { - return -1; - } - - int32_t code = taosWriteQitem(pMgmt->pApplyQ, pMsg); - dndReleaseMnode(pDnode, pMnode); - return code; -} - static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMnodeMgmtQueue); @@ -817,23 +778,6 @@ static void dndFreeMnodeWriteQueue(SDnode *pDnode) { pMgmt->pWriteQ = NULL; } -static int32_t dndAllocMnodeApplyQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->pApplyQ = tWorkerAllocQueue(&pMgmt->writePool, pDnode, (FProcessItem)dndProcessMnodeApplyQueue); - if (pMgmt->pApplyQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeMnodeApplyQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerFreeQueue(&pMgmt->writePool, pMgmt->pApplyQ); - pMgmt->pApplyQ = NULL; -} - static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; SWorkerPool *pPool = &pMgmt->writePool; diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index ba8746c0095b4df3496a7636cb5fec263f5396cb..a9932ce0483f9448b003a40f790eff960a200148 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -61,6 +61,13 @@ typedef struct { char email[TSDB_FQDN_LEN]; } STelemMgmt; +typedef struct { + int32_t errCode; + sem_t syncSem; + SSyncNode *pSyncNode; + ESyncState state; +} SSyncMgmt; + typedef struct SMnode { int32_t dnodeId; int32_t clusterId; @@ -77,11 +84,11 @@ typedef struct SMnode { SShowMgmt showMgmt; SProfileMgmt profileMgmt; STelemMgmt telemMgmt; + SSyncMgmt syncMgmt; MndMsgFp msgFp[TDMT_MAX]; SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp; SendRedirectMsgFp sendRedirectMsgFp; - PutMsgToMnodeQFp putMsgToApplyMsgFp; } SMnode; void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 5e9165f8980ae9946f86273c6931481a85a49145..6a2fca836f1c3446aad94d96aa267c56df46e459 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -14,26 +14,54 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "mndInt.h" -#include "mndTrans.h" +#include "mndSync.h" -int32_t mndInitSync(SMnode *pMnode) { return 0; } -void mndCleanupSync(SMnode *pMnode) {} +int32_t mndInitSync(SMnode *pMnode) { + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + tsem_init(&pMgmt->syncSem, 0, 0); + + pMgmt->state = TAOS_SYNC_STATE_LEADER; + pMgmt->pSyncNode = NULL; + return 0; +} + +void mndCleanupSync(SMnode *pMnode) { + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + tsem_destroy(&pMgmt->syncSem); +} + +static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) { + SMnode *pMnode = pData; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + + pMgmt->errCode = 0; + tsem_post(&pMgmt->syncSem); + + return 0; +} int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { - int32_t code = 0; +#if 1 + return 0; +#else + if (pMnode->replica == 1) return 0; + + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + pMgmt->errCode = 0; + + SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)}; + + bool isWeak = false; + int32_t code = syncPropose(pMgmt->pSyncNode, &buf, pMnode, isWeak); - // int32_t len = sdbGetRawTotalSize(pRaw); - // SSdbRaw *pReceived = calloc(1, len); - // memcpy(pReceived, pRaw, len); - // mDebug("trans:%d, data:%p recv from sync, code:0x%x pMsg:%p", pMsg->id, pReceived, code & 0xFFFF, pMsg); + if (code != 0) return code; - // mndTransApply(pMnode, pReceived, code); - return code; + tsem_wait(&pMgmt->syncSem); + return pMgmt->errCode; +#endif } bool mndIsMaster(SMnode *pMnode) { - // pMnode->role = TAOS_SYNC_STATE_LEADER; - return true; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + return pMgmt->state == TAOS_SYNC_STATE_LEADER; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 2432e31b9eaf07efcc02d529035f8f2a55b168d7..f63d14e71194f8f49b20f3ee8b3b420a510b4fb3 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -746,7 +746,7 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) { } } - return 0; + return code; } static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 26b1c71a10b5cda278c08866154ffe7c47272cc6..27668a585aeb637dc57e16b6f5f8159c348fc6c6 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -202,7 +202,6 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { pMnode->selfIndex = pOption->selfIndex; memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); pMnode->pDnode = pOption->pDnode; - pMnode->putMsgToApplyMsgFp = pOption->putMsgToApplyMsgFp; pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp; pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp; pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp; @@ -217,8 +216,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { pMnode->cfg.buildinfo = strdup(pOption->cfg.buildinfo); if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL || - pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 || - pMnode->cfg.statusInterval < 1) { + pMnode->dnodeId < 0 || pMnode->clusterId < 0 || pMnode->cfg.statusInterval < 1) { terrno = TSDB_CODE_MND_INVALID_OPTIONS; return -1; } @@ -438,8 +436,6 @@ void mndProcessWriteMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); } void mndProcessSyncMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); } -void mndProcessApplyMsg(SMnodeMsg *pMsg) {} - uint64_t mndGenerateUid(char *name, int32_t len) { int64_t us = taosGetTimestampUs(); int32_t hashval = MurmurHash3_32(name, len);