提交 a4d6d84e 编写于 作者: D dmchen

upperindex

上级 8157b850
......@@ -514,6 +514,7 @@ SSyncState syncGetState(int64_t rid) {
} else {
state.canRead = state.restored;
}
/*
double progress = 0;
if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
......@@ -526,6 +527,7 @@ SSyncState syncGetState(int64_t rid) {
"progress:%lf, progress:%d",
pSyncNode->vgId,
pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
*/
syncNodeRelease(pSyncNode);
}
......
......@@ -484,8 +484,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
taosMsleep(1);
goto _out;
}
ASSERT(pEntry->index == pBuf->matchIndex);
if(pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE){
if(pNode->pLogBuf->commitIndex == pEntry->index -1){
sInfo("vgId:%d, to change config at %s. "
......@@ -518,6 +517,8 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
// replicate on demand
(void)syncNodeReplicateWithoutLock(pNode);
ASSERT(pEntry->index == pBuf->matchIndex);
// update my match index
matchIndex = pBuf->matchIndex;
syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex);
......@@ -535,11 +536,12 @@ _out:
int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
int32_t applyCode, bool force) {
if (pNode->replicaNum == 1 &&
pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER &&
pNode->restoreFinish && pNode->vgId != 1 /*&& pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE*/
//learner need to execute fsm when it catch up entry log
//if force is true, keep all contition check to execute fsm
if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1
&& pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER
&& force == false) {
sDebug("vgId:%d, not to execute, index:%" PRId64 ", term:%" PRId64 ", type:%s code:0x%x, replicaNum:%d,"
sDebug("vgId:%d, not to execute fsm, index:%" PRId64 ", term:%" PRId64 ", type:%s code:0x%x, replicaNum:%d,"
"role:%d, restoreFinish:%d",
pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode,
pNode->replicaNum, pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole, pNode->restoreFinish);
......@@ -653,17 +655,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
goto _out;
}
//TODO cdm tmp code
if(index + 1 > upperIndex){
sInfo("vgId:%d, exeed upperIndex. index + 1:%" PRId64 ", term:%" PRId64
", role:%d, current term:%" PRId64 ", upperIndex:%" PRId64 ", commitIndex:%" PRId64
"matchIndex:%" PRId64,
vgId, pNextEntry->index, pNextEntry->term, role, currentTerm, upperIndex,
commitIndex, pBuf->matchIndex);
}
//for 2->1, need to apply config change entry in sync thead
//for 2->1, need to apply config change entry in sync thread,
if(pNode->replicaNum == 1){
if (syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true) != 0) {
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
......@@ -684,13 +676,13 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
pNextEntry = NULL;
}
}
if (!inBuf) {
syncEntryDestroy(pEntry);
pEntry = NULL;
}
}
if (!inBuf) {
syncEntryDestroy(pEntry);
pEntry = NULL;
}
// recycle
SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION;
for (SyncIndex index = pBuf->startIndex; index < until; index++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册