提交 092a0747 编写于 作者: M Minghao Li

refactor(sync): process hb and appendentries

上级 a3f8c03a
......@@ -244,6 +244,9 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
int32_t syncLeaderTransfer(int64_t rid);
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader);
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
int32_t syncEndSnapshot(int64_t rid);
#ifdef __cplusplus
}
#endif
......
......@@ -696,6 +696,8 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
int32_t syncNodeOnSnapshotSendCb(SSyncNode* ths, SyncSnapshotSend* pMsg);
int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg);
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg);
......
......@@ -15,7 +15,7 @@
#include "vnd.h"
#define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
......@@ -236,7 +236,9 @@ int vnodeCommit(SVnode *pVnode) {
ASSERT(0);
return -1;
}
walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
// walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
syncBeginSnapshot(pVnode->sync, pVnode->state.applied);
// preCommit
// smaSyncPreCommit(pVnode->pSma);
......@@ -301,7 +303,8 @@ int vnodeCommit(SVnode *pVnode) {
}
// apply the commit (TODO)
walEndSnapshot(pVnode->pWal);
// walEndSnapshot(pVnode->pWal);
syncEndSnapshot(pVnode->sync);
vInfo("vgId:%d, commit end", TD_VID(pVnode));
......
......@@ -196,6 +196,8 @@ typedef struct SSyncNode {
// is config changing
bool changing;
int64_t snapshottingIndex;
int64_t startTime;
int64_t leaderTime;
int64_t lastReplicateTime;
......
......@@ -51,6 +51,7 @@ typedef struct SSyncSnapshotSender {
int32_t replicaIndex;
SyncTerm term;
SyncTerm privateTerm;
int64_t startTime;
bool finish;
} SSyncSnapshotSender;
......
......@@ -1042,6 +1042,43 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
return ret;
}
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
// maybe update commit index, leader notice me
if (newCommitIndex > ths->commitIndex) {
// has commit entry in local
if (newCommitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
// advance commit index to sanpshot first
SSnapshot snapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) {
SyncIndex commitBegin = ths->commitIndex;
SyncIndex commitEnd = snapshot.lastApplyIndex;
ths->commitIndex = snapshot.lastApplyIndex;
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, commitBegin,
commitEnd);
syncNodeEventLog(ths, eventLog);
}
SyncIndex beginIndex = ths->commitIndex + 1;
SyncIndex endIndex = newCommitIndex;
// update commit index
ths->commitIndex = newCommitIndex;
// call back Wal
int32_t code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
ASSERT(code == 0);
code = syncNodeCommit(ths, beginIndex, endIndex, ths->state);
ASSERT(code == 0);
}
}
return 0;
}
int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
// prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
......@@ -1092,16 +1129,25 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
SyncIndex appendIndex = pMsg->prevLogIndex + 1;
SSyncRaftEntry* pLocalEntry = NULL;
int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, appendIndex, &pLocalEntry);
ASSERT(code == 0);
if (pLocalEntry->term == pAppendEntry->term) {
// do nothing
} else {
if (code != 0 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex);
ASSERT(code == 0);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
ASSERT(code == 0);
} else {
ASSERT(code == 0);
if (pLocalEntry->term == pAppendEntry->term) {
// do nothing
} else {
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex);
ASSERT(code == 0);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
ASSERT(code == 0);
}
}
syncEntryDestory(pLocalEntry);
......@@ -1112,37 +1158,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
// maybe update commit index, leader notice me
if (pMsg->commitIndex > ths->commitIndex) {
// has commit entry in local
if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
// advance commit index to sanpshot first
SSnapshot snapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) {
SyncIndex commitBegin = ths->commitIndex;
SyncIndex commitEnd = snapshot.lastApplyIndex;
ths->commitIndex = snapshot.lastApplyIndex;
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, commitBegin,
commitEnd);
syncNodeEventLog(ths, eventLog);
}
SyncIndex beginIndex = ths->commitIndex + 1;
SyncIndex endIndex = pMsg->commitIndex;
// update commit index
ths->commitIndex = pMsg->commitIndex;
// call back Wal
int32_t code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
ASSERT(code == 0);
code = syncNodeCommit(ths, beginIndex, endIndex, ths->state);
ASSERT(code == 0);
}
}
syncNodeFollowerCommit(ths, pMsg->commitIndex);
goto _SEND_RESPONSE;
......
......@@ -288,6 +288,51 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
return ret;
}
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
ASSERT(rid == pSyncNode->rid);
int32_t code = 0;
if (pSyncNode->replicaNum == 1) {
SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
code = walBeginSnapshot(pData->pWal, lastApplyIndex);
} else {
SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
if (snapshottingIndex == SYNC_INDEX_INVALID) {
atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
code = walBeginSnapshot(pData->pWal, lastApplyIndex);
} else {
sError("vgId:%d snapshotting index:%ld, lastApplyIndex:%ld", snapshottingIndex, lastApplyIndex);
}
}
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return code;
}
int32_t syncEndSnapshot(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
ASSERT(rid == pSyncNode->rid);
SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
int32_t code = walEndSnapshot(pData->pWal);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return code;
}
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
if (pSyncNode->peersNum == 0) {
sDebug("only one replica, cannot leader transfer");
......@@ -1231,6 +1276,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
pSyncNode->leaderTime = timeNow;
pSyncNode->lastReplicateTime = timeNow;
// snapshotting
atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
syncNodeEventLog(pSyncNode, "sync open");
return pSyncNode;
......@@ -1423,11 +1471,13 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
&pSyncNode->pElectTimer);
atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "elect timer reset, ms:%d", ms);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
/*
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "elect timer reset, ms:%d", ms);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
*/
} else {
sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
......@@ -1441,7 +1491,7 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
taosTmrStop(pSyncNode->pElectTimer);
pSyncNode->pElectTimer = NULL;
sTrace("vgId:%d, sync %s stop elect timer", pSyncNode->vgId, syncUtilState2String(pSyncNode->state));
// sTrace("vgId:%d, sync %s stop elect timer", pSyncNode->vgId, syncUtilState2String(pSyncNode->state));
return ret;
}
......@@ -2334,6 +2384,10 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode);
if (pSyncNode->replicaNum > 1) {
syncNodeDoReplicate(pSyncNode);
}
}
bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); }
......@@ -2868,8 +2922,12 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
}
if (pMsg->term == ths->pRaftStore->currentTerm) {
sInfo("vgId:%d, heartbeat reset timer", 1);
// sInfo("vgId:%d, heartbeat reset timer", ths->vgId);
syncNodeResetElectTimer(ths);
if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
syncNodeFollowerCommit(ths, pMsg->commitIndex);
}
}
/*
......
......@@ -41,6 +41,8 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
}
memset(pSender, 0, sizeof(*pSender));
int64_t timeNow = taosGetTimestampMs();
pSender->start = false;
pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
......@@ -51,7 +53,8 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->pSyncNode = pSyncNode;
pSender->replicaIndex = replicaIndex;
pSender->term = pSyncNode->pRaftStore->currentTerm;
pSender->privateTerm = taosGetTimestampMs() + 100;
pSender->privateTerm = timeNow + 100;
pSender->startTime = timeNow;
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
pSender->finish = false;
} else {
......@@ -402,7 +405,21 @@ char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
return s;
}
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { return 0; }
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
// calculate <start, end> index
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
if (pSender == NULL) {
// create sender
} else {
// if <start, end> is same
// return 0;
}
// send begin msg
return 0;
}
// -------------------------------------
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
......
......@@ -113,6 +113,13 @@ endi
#return 0
vg_ready:
print ====> create stable/child table
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int)
......@@ -120,7 +127,7 @@ sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int)
return 0
#return 0
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册