提交 32d48f3b 编写于 作者: S Shengliang Guan

refactor: sync integrate into mnode

上级 e06f9fd0
......@@ -46,9 +46,8 @@ static void mndCloseWal(SMnode *pMnode) {
}
static int32_t mndRestoreWal(SMnode *pMnode) {
// do nothing
return 0;
// do nothing
return 0;
#if 0
......@@ -122,7 +121,6 @@ _OVER:
return code;
#endif
}
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
......@@ -132,7 +130,7 @@ int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) {
SSnapshot snapshot;
SSnapshot snapshot = {0};
pFsm->FpGetSnapshot(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex;
}
......@@ -141,8 +139,9 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
mndProcessApplyMsg((SRpcMsg*)pMsg);
//mmPutNodeMsgToApplyQueue(pMnode->pWrapper->pMgmt, pMsg);
SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg;
pApplyMsg->info.node = pFsm->data;
mndProcessApplyMsg(pApplyMsg);
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
tsem_post(&pMgmt->syncSem);
......@@ -182,19 +181,16 @@ int32_t mndInitSync(SMnode *pMnode) {
return -1;
}
if (pMnode->selfId == 1) {
pMgmt->state = TAOS_SYNC_STATE_LEADER;
}
SSyncInfo syncInfo = {.vgId = 1};
SSyncCfg *pCfg = &(syncInfo.syncCfg);
pCfg->replicaNum = pMnode->replica;
pCfg->myIndex = pMnode->selfIndex;
for (int i = 0; i < pMnode->replica; ++i) {
snprintf((pCfg->nodeInfo)->nodeFqdn, sizeof((pCfg->nodeInfo)->nodeFqdn), "%s", (pMnode->replicas)[i].fqdn);
(pCfg->nodeInfo)->nodePort = (pMnode->replicas)[i].port;
for (int32_t i = 0; i < pMnode->replica; ++i) {
SNodeInfo *pNodeInfo = &pCfg->nodeInfo[i];
tstrncpy(pNodeInfo->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNodeInfo->nodeFqdn));
pNodeInfo->nodePort = pMnode->replicas[i].port;
}
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", pMnode->path);
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
syncInfo.pWal = pMnode->syncMgmt.pWal;
syncInfo.pFsm = mndSyncMakeFsm(pMnode);
syncInfo.FpSendMsg = mndSyncSendMsg;
......@@ -242,31 +238,38 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
#else
if (pMnode->replica == 1) return 0;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->errCode = 0;
//SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)};
SRpcMsg rpcMsg;
rpcMsg.code = TDMT_MND_APPLY_MSG;
rpcMsg.contLen = sdbGetRawTotalSize(pRaw);
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pRaw, rpcMsg.contLen);
SRpcMsg rsp = {0};
rsp.code = TDMT_MND_APPLY_MSG;
rsp.contLen = sdbGetRawTotalSize(pRaw);
rsp.pCont = rpcMallocCont(rsp.contLen);
memcpy(rsp.pCont, pRaw, rsp.contLen);
bool isWeak = false;
int32_t code = syncPropose(pMgmt->sync, &rpcMsg, isWeak);
int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak);
if (code == 0) {
tsem_wait(&pMgmt->syncSem);
} else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
terrno = TSDB_CODE_APP_NOT_READY;
mError("failed to propose raw:%p since not leader", pRaw);
return -1;
} else if (code == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
mError("failed to propose raw:%p since sync internal error", pRaw);
} else {
assert(0);
}
if (code != 0) return code;
tsem_wait(&pMgmt->syncSem);
return pMgmt->errCode;
#endif
}
bool mndIsMaster(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->state = syncGetMyRole(pMgmt->sync);
return pMgmt->state == TAOS_SYNC_STATE_LEADER;
}
......@@ -336,8 +336,9 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
return 0;
}
int32_t mndStart(SMnode *pMnode) {
syncStart(pMnode->syncMgmt.sync);
int32_t mndStart(SMnode *pMnode) {
syncSetMsgCb(pMnode->syncMgmt.sync, &pMnode->msgCb);
syncStart(pMnode->syncMgmt.sync);
return mndInitTimer(pMnode);
}
......
......@@ -31,11 +31,9 @@ SSdb *sdbInit(SSdbOpt *pOption) {
char path[PATH_MAX + 100] = {0};
snprintf(path, sizeof(path), "%s%sdata", pOption->path, TD_DIRSEP);
pSdb->currDir = strdup(path);
snprintf(path, sizeof(path), "%s%ssync", pOption->path, TD_DIRSEP);
pSdb->syncDir = strdup(path);
snprintf(path, sizeof(path), "%s%stmp", pOption->path, TD_DIRSEP);
pSdb->tmpDir = strdup(path);
if (pSdb->currDir == NULL || pSdb->currDir == NULL || pSdb->currDir == NULL) {
if (pSdb->currDir == NULL || pSdb->tmpDir == NULL) {
sdbCleanup(pSdb);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to init sdb since %s", terrstr());
......@@ -149,12 +147,6 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
return -1;
}
if (taosMkDir(pSdb->syncDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->syncDir, terrstr());
return -1;
}
if (taosMkDir(pSdb->tmpDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册