diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index d8eb021462565ad09a7f0211ad92ba7b2bc24827..df289aa28508436d3a017eefb7a198a922ceaad1 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -46,9 +46,8 @@ static void mndCloseWal(SMnode *pMnode) { } static int32_t mndRestoreWal(SMnode *pMnode) { - -// do nothing -return 0; + // do nothing + return 0; #if 0 @@ -122,7 +121,6 @@ _OVER: return code; #endif - } int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } @@ -132,7 +130,7 @@ int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { SyncIndex beginIndex = SYNC_INDEX_INVALID; if (pFsm->FpGetSnapshot != NULL) { - SSnapshot snapshot; + SSnapshot snapshot = {0}; pFsm->FpGetSnapshot(pFsm, &snapshot); beginIndex = snapshot.lastApplyIndex; } @@ -141,8 +139,9 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM SMnode *pMnode = pFsm->data; SSyncMgmt *pMgmt = &pMnode->syncMgmt; - mndProcessApplyMsg((SRpcMsg*)pMsg); - //mmPutNodeMsgToApplyQueue(pMnode->pWrapper->pMgmt, pMsg); + SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg; + pApplyMsg->info.node = pFsm->data; + mndProcessApplyMsg(pApplyMsg); if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { tsem_post(&pMgmt->syncSem); @@ -182,19 +181,16 @@ int32_t mndInitSync(SMnode *pMnode) { return -1; } - if (pMnode->selfId == 1) { - pMgmt->state = TAOS_SYNC_STATE_LEADER; - } - SSyncInfo syncInfo = {.vgId = 1}; SSyncCfg *pCfg = &(syncInfo.syncCfg); pCfg->replicaNum = pMnode->replica; pCfg->myIndex = pMnode->selfIndex; - for (int i = 0; i < pMnode->replica; ++i) { - snprintf((pCfg->nodeInfo)->nodeFqdn, sizeof((pCfg->nodeInfo)->nodeFqdn), "%s", (pMnode->replicas)[i].fqdn); - (pCfg->nodeInfo)->nodePort = (pMnode->replicas)[i].port; + for (int32_t i = 0; i < pMnode->replica; ++i) { + SNodeInfo *pNodeInfo = &pCfg->nodeInfo[i]; + tstrncpy(pNodeInfo->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNodeInfo->nodeFqdn)); + pNodeInfo->nodePort = pMnode->replicas[i].port; } - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", pMnode->path); + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); syncInfo.pWal = pMnode->syncMgmt.pWal; syncInfo.pFsm = mndSyncMakeFsm(pMnode); syncInfo.FpSendMsg = mndSyncSendMsg; @@ -242,31 +238,38 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { #else - if (pMnode->replica == 1) return 0; - SSyncMgmt *pMgmt = &pMnode->syncMgmt; pMgmt->errCode = 0; - //SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)}; - - SRpcMsg rpcMsg; - rpcMsg.code = TDMT_MND_APPLY_MSG; - rpcMsg.contLen = sdbGetRawTotalSize(pRaw); - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - memcpy(rpcMsg.pCont, pRaw, rpcMsg.contLen); + SRpcMsg rsp = {0}; + rsp.code = TDMT_MND_APPLY_MSG; + rsp.contLen = sdbGetRawTotalSize(pRaw); + rsp.pCont = rpcMallocCont(rsp.contLen); + memcpy(rsp.pCont, pRaw, rsp.contLen); bool isWeak = false; - int32_t code = syncPropose(pMgmt->sync, &rpcMsg, isWeak); + int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak); + if (code == 0) { + tsem_wait(&pMgmt->syncSem); + } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { + terrno = TSDB_CODE_APP_NOT_READY; + mError("failed to propose raw:%p since not leader", pRaw); + return -1; + } else if (code == TAOS_SYNC_PROPOSE_OTHER_ERROR) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + mError("failed to propose raw:%p since sync internal error", pRaw); + } else { + assert(0); + } if (code != 0) return code; - tsem_wait(&pMgmt->syncSem); return pMgmt->errCode; #endif - } bool mndIsMaster(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; + pMgmt->state = syncGetMyRole(pMgmt->sync); return pMgmt->state == TAOS_SYNC_STATE_LEADER; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 297174beb7b9ab1535d301e6e12f7ee7f4cd2bb0..9a68e0e1f09ccfd643d3794b24a04cbc115d61c1 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -336,8 +336,9 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { return 0; } -int32_t mndStart(SMnode *pMnode) { - syncStart(pMnode->syncMgmt.sync); +int32_t mndStart(SMnode *pMnode) { + syncSetMsgCb(pMnode->syncMgmt.sync, &pMnode->msgCb); + syncStart(pMnode->syncMgmt.sync); return mndInitTimer(pMnode); } diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 1f11a77e6c7575a8f602bb4720b0445b5c5c0372..6e5dde57a6ddb1f45c60a65542e6ea282836d94c 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -31,11 +31,9 @@ SSdb *sdbInit(SSdbOpt *pOption) { char path[PATH_MAX + 100] = {0}; snprintf(path, sizeof(path), "%s%sdata", pOption->path, TD_DIRSEP); pSdb->currDir = strdup(path); - snprintf(path, sizeof(path), "%s%ssync", pOption->path, TD_DIRSEP); - pSdb->syncDir = strdup(path); snprintf(path, sizeof(path), "%s%stmp", pOption->path, TD_DIRSEP); pSdb->tmpDir = strdup(path); - if (pSdb->currDir == NULL || pSdb->currDir == NULL || pSdb->currDir == NULL) { + if (pSdb->currDir == NULL || pSdb->tmpDir == NULL) { sdbCleanup(pSdb); terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to init sdb since %s", terrstr()); @@ -149,12 +147,6 @@ static int32_t sdbCreateDir(SSdb *pSdb) { return -1; } - if (taosMkDir(pSdb->syncDir) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to create dir:%s since %s", pSdb->syncDir, terrstr()); - return -1; - } - if (taosMkDir(pSdb->tmpDir) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr());