提交 327b938c 编写于 作者: M Minghao Li

refactor(sync): make leader life longer

上级 e23c969d
......@@ -212,6 +212,7 @@ void syncNodeRelease(SSyncNode* pNode);
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term);
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr);
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr);
......
......@@ -1987,6 +1987,12 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
}
}
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
if (term > pSyncNode->pRaftStore->currentTerm) {
raftStoreSetTerm(pSyncNode->pRaftStore, term);
}
}
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// maybe clear leader cache
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
......@@ -2614,7 +2620,7 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p
// fsync once
SSyncLogStoreData* pData = ths->pLogStore->data;
SWal* pWal = pData->pWal;
walFsync(pWal, true);
walFsync(pWal, false);
if (ths->replicaNum > 1) {
// if multi replica, start replicate right now
......@@ -2801,7 +2807,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ESyncState state = flag;
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "commit wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
snprintf(eventLog, sizeof(eventLog), "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
syncNodeEventLog(ths, eventLog);
// execute fsm
......
......@@ -237,51 +237,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
return 0;
}
#if 0
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SyncIndex writeIndex = raftLogWriteIndex(pLogStore);
if (pEntry->index != writeIndex) {
sError("vgId:%d, wal write index error, entry-index:%" PRId64 " update to %" PRId64, pData->pSyncNode->vgId,
pEntry->index, writeIndex);
pEntry->index = writeIndex;
}
int code = 0;
SWalSyncInfo syncMeta;
syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term;
code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
if (code != 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
pEntry->index, err, err, errStr, sysErr, sysErrStr);
syncNodeErrorLog(pData->pSyncNode, logBuf);
ASSERT(0);
}
// walFsync(pWal, true);
do {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "write index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index,
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
syncNodeEventLog(pData->pSyncNode, eventLog);
} while (0);
return code;
}
#endif
// entry found, return 0
// entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
// other error, return -1
......@@ -400,45 +355,6 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
//-------------------------------
// log[0 .. n]
#if 0
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SyncIndex lastIndex = logStoreLastIndex(pLogStore);
ASSERT(pEntry->index == lastIndex + 1);
int code = 0;
SWalSyncInfo syncMeta;
syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term;
code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
if (code != 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
pEntry->index, err, err, errStr, sysErr, sysErrStr);
syncNodeErrorLog(pData->pSyncNode, logBuf);
ASSERT(0);
}
// walFsync(pWal, true);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "old write index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index,
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
syncNodeEventLog(pData->pSyncNode, eventLog);
return code;
}
#endif
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
......
......@@ -55,14 +55,13 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) &&
(pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore)));
// log not ok, do not update term, ignore it
if (pMsg->term > ths->pRaftStore->currentTerm && !logOK) {
return -1;
}
// maybe update term
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
if (logOK) {
syncNodeUpdateTerm(ths, pMsg->term);
} else {
syncNodeUpdateTermWithoutStepDown(ths, pMsg->term);
}
}
ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
......@@ -164,13 +163,18 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
return -1;
}
bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg);
// maybe update term
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
if (logOK) {
syncNodeUpdateTerm(ths, pMsg->term);
} else {
syncNodeUpdateTermWithoutStepDown(ths, pMsg->term);
}
}
ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg);
bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
if (grant) {
......
......@@ -81,4 +81,16 @@ for file in `ls ${logpath}/log.dnode*vgId*`;do
done
echo ""
echo "generate log.commit ..."
tmpfile=${logpath}/log.commits.tmp
touch ${tmpfile}
for file in `ls ${logpath}/log.dnode*.vgId*.commit`;do
line=`cat ${file} | tail -n1`
echo $line | awk '{print $5, $0}' >> ${tmpfile}
done
cat ${tmpfile} | sort -k1 > ${logpath}/log.commits
exit 0
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册