提交 89c1e823 编写于 作者: S Shengliang Guan

refactor: sync integrate into mnode

上级 c38a8055
...@@ -75,9 +75,10 @@ typedef struct { ...@@ -75,9 +75,10 @@ typedef struct {
} STelemMgmt; } STelemMgmt;
typedef struct { typedef struct {
SWal *pWal;
int32_t errCode; int32_t errCode;
bool restored;
sem_t syncSem; sem_t syncSem;
SWal *pWal;
int64_t sync; int64_t sync;
ESyncState state; ESyncState state;
} SSyncMgmt; } SSyncMgmt;
......
...@@ -25,7 +25,10 @@ extern "C" { ...@@ -25,7 +25,10 @@ extern "C" {
int32_t mndInitSync(SMnode *pMnode); int32_t mndInitSync(SMnode *pMnode);
void mndCleanupSync(SMnode *pMnode); void mndCleanupSync(SMnode *pMnode);
bool mndIsMaster(SMnode *pMnode); bool mndIsMaster(SMnode *pMnode);
bool mndIsRestored(SMnode *pMnode);
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw); int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw);
void mndSyncStart(SMnode *pMnode);
void mndSyncStop(SMnode *pMnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -68,7 +68,6 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { ...@@ -68,7 +68,6 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
int32_t mndInitSync(SMnode *pMnode) { int32_t mndInitSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
tsem_init(&pMgmt->syncSem, 0, 0);
char path[PATH_MAX + 20] = {0}; char path[PATH_MAX + 20] = {0};
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP); snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
...@@ -102,6 +101,7 @@ int32_t mndInitSync(SMnode *pMnode) { ...@@ -102,6 +101,7 @@ int32_t mndInitSync(SMnode *pMnode) {
pNode->nodePort = pMnode->replicas[i].port; pNode->nodePort = pMnode->replicas[i].port;
} }
tsem_init(&pMgmt->syncSem, 0, 0);
pMgmt->sync = syncOpen(&syncInfo); pMgmt->sync = syncOpen(&syncInfo);
if (pMgmt->sync <= 0) { if (pMgmt->sync <= 0) {
mError("failed to open sync since %s", terrstr()); mError("failed to open sync since %s", terrstr());
...@@ -146,8 +146,30 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { ...@@ -146,8 +146,30 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
return pMgmt->errCode; return pMgmt->errCode;
} }
void mndSyncStart(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
int64_t lastApplyIndex = sdbGetApplyIndex(pSdb);
syncSetMsgCb(pMnode->syncMgmt.sync, &pMnode->msgCb);
syncStart(pMnode->syncMgmt.sync);
int64_t applyIndex = sdbGetApplyIndex(pSdb);
mndTransPullup(pMnode);
mDebug("pullup trans finished, applyIndex:%" PRId64, applyIndex);
if (applyIndex != lastApplyIndex) {
mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastApplyIndex, applyIndex);
sdbWriteFile(pSdb);
}
pMnode->syncMgmt.restored = true;
}
void mndSyncStop(SMnode *pMnode) { syncStop(pMnode->syncMgmt.sync); }
bool mndIsMaster(SMnode *pMnode) { bool mndIsMaster(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->state = syncGetMyRole(pMgmt->sync); pMgmt->state = syncGetMyRole(pMgmt->sync);
return pMgmt->state == TAOS_SYNC_STATE_LEADER; return pMgmt->state == TAOS_SYNC_STATE_LEADER;
} }
bool mndIsRestored(SMnode *pMnode) { return pMnode->syncMgmt.restored; }
\ No newline at end of file
...@@ -682,16 +682,6 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { ...@@ -682,16 +682,6 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
} }
mDebug("trans:%d, sync finished", pTrans->id); mDebug("trans:%d, sync finished", pTrans->id);
// do it in state machine commit cb
#if 0
code = sdbWriteWithout(pMnode->pSdb, pRaw);
if (code != 0) {
mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
return -1;
}
#endif
return 0; return 0;
} }
......
...@@ -86,7 +86,7 @@ static void *mndThreadFp(void *param) { ...@@ -86,7 +86,7 @@ static void *mndThreadFp(void *param) {
lastTime++; lastTime++;
taosMsleep(100); taosMsleep(100);
if (pMnode->stopped) break; if (pMnode->stopped) break;
if (!mndIsMaster(pMnode)) continue; if (!mndIsMaster(pMnode) || !mndIsRestored(pMnode)) continue;
if (lastTime % (tsTransPullupInterval * 10) == 0) { if (lastTime % (tsTransPullupInterval * 10) == 0) {
mndPullupTrans(pMnode); mndPullupTrans(pMnode);
...@@ -337,13 +337,12 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { ...@@ -337,13 +337,12 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
} }
int32_t mndStart(SMnode *pMnode) { int32_t mndStart(SMnode *pMnode) {
syncSetMsgCb(pMnode->syncMgmt.sync, &pMnode->msgCb); mndSyncStart(pMnode);
syncStart(pMnode->syncMgmt.sync);
return mndInitTimer(pMnode); return mndInitTimer(pMnode);
} }
void mndStop(SMnode *pMnode) { void mndStop(SMnode *pMnode) {
syncStop(pMnode->syncMgmt.sync); mndSyncStop(pMnode);
return mndCleanupTimer(pMnode); return mndCleanupTimer(pMnode);
} }
...@@ -357,7 +356,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { ...@@ -357,7 +356,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
void *ahandle = pMsg->info.ahandle; void *ahandle = pMsg->info.ahandle;
int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
if (syncEnvIsStart()) { if (syncEnvIsStart()) {
SSyncNode *pSyncNode = syncNodeAcquire(pMnode->syncMgmt.sync); SSyncNode *pSyncNode = syncNodeAcquire(pMnode->syncMgmt.sync);
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
...@@ -444,7 +443,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { ...@@ -444,7 +443,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
return ret; return ret;
return 0; return 0;
} }
...@@ -454,7 +452,7 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) { ...@@ -454,7 +452,7 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) {
mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(pMsg->msgType), ahandle); mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(pMsg->msgType), ahandle);
if (IsReq(pMsg)) { if (IsReq(pMsg)) {
if (!mndIsMaster(pMnode)) { if (!mndIsMaster(pMnode) || !mndIsRestored(pMnode)) {
terrno = TSDB_CODE_APP_NOT_READY; terrno = TSDB_CODE_APP_NOT_READY;
mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
return -1; return -1;
...@@ -514,7 +512,7 @@ int64_t mndGenerateUid(char *name, int32_t len) { ...@@ -514,7 +512,7 @@ int64_t mndGenerateUid(char *name, int32_t len) {
int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo) { SMonGrantInfo *pGrantInfo) {
if (!mndIsMaster(pMnode)) return -1; if (!mndIsMaster(pMnode) || !mndIsRestored(pMnode)) return -1;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int64_t ms = taosGetTimestampMs(); int64_t ms = taosGetTimestampMs();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册