提交 3e6d6fe8 编写于 作者: M Minghao Li

refactor(sync): do replicate in timer routine

上级 357951e9
......@@ -1259,11 +1259,9 @@ void syncNodeStart(SSyncNode* pSyncNode) {
syncNodeBecomeFollower(pSyncNode, "first start");
}
if (syncNodeIsMnode(pSyncNode)) {
int32_t ret = 0;
ret = syncNodeStartPingTimer(pSyncNode);
ASSERT(ret == 0);
}
int32_t ret = 0;
ret = syncNodeStartPingTimer(pSyncNode);
ASSERT(ret == 0);
}
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
......@@ -1276,11 +1274,9 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS);
ASSERT(ret == 0);
if (pSyncNode->vgId == 1) {
int32_t ret = 0;
ret = syncNodeStartPingTimer(pSyncNode);
ASSERT(ret == 0);
}
ret = 0;
ret = syncNodeStartPingTimer(pSyncNode);
ASSERT(ret == 0);
}
void syncNodeClose(SSyncNode* pSyncNode) {
......
......@@ -55,7 +55,12 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) {
// maybe start snapshot
SyncIndex logStartIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
if (nextIndex < logStartIndex || nextIndex > logEndIndex) {
if (nextIndex < logStartIndex || nextIndex - 1 > logEndIndex) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "start snapshot for next-index:%ld, start:%ld, end:%ld", nextIndex, logStartIndex,
logEndIndex);
syncNodeEventLog(pSyncNode, logBuf);
// start snapshot
int32_t code = syncNodeStartSnapshot(pSyncNode, pDestId);
ASSERT(code == 0);
......@@ -128,12 +133,12 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) {
}
int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
syncNodeEventLog(pSyncNode, "do replicate");
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
return -1;
}
syncNodeEventLog(pSyncNode, "do replicate");
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SRaftId* pDestId = &(pSyncNode->peersId[i]);
......@@ -170,7 +175,17 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest
int32_t ret = 0;
if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) {
ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pMsg);
} else {
char logBuf[128];
char host[64];
int16_t port;
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf), "do not repcate to %s:%d for index:%ld", host, port, pMsg->prevLogIndex + 1);
syncNodeEventLog(pSyncNode, logBuf);
}
return ret;
}
......
......@@ -60,6 +60,9 @@ static void syncNodeCleanConfigIndex(SSyncNode* ths) {
int32_t syncNodeTimerRoutine(SSyncNode* ths) {
syncNodeEventLog(ths, "timer routines");
// timer replicate
syncNodeReplicate(ths);
if (syncNodeIsMnode(ths)) {
syncNodeCleanConfigIndex(ths);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册