提交 def4058e 编写于 作者: B Benguang Zhao

feat: impl pipelining negotiation

上级 5878c9a3
......@@ -40,6 +40,8 @@ extern bool gRaftDetailLog;
#define SYNC_MNODE_LOG_RETENTION 10000
#define SYNC_VNODE_LOG_RETENTION 500
#define SYNC_MAX_RETRY_BACKOFF 5
#define SYNC_LOG_REPL_RETRY_WAIT_MS 50
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
#define SYNC_MAX_BATCH_SIZE 1
......
......@@ -82,6 +82,13 @@ static FORCE_INLINE int64_t taosGetTimestampNs() {
return (int64_t)systemTime.tv_sec * 1000000000LL + (int64_t)systemTime.tv_nsec;
}
//@return timestamp of monotonic clock in millisecond
static FORCE_INLINE int64_t taosGetMonoTimestampMs() {
struct timespec systemTime = {0};
taosClockGetTime(CLOCK_MONOTONIC, &systemTime);
return (int64_t)systemTime.tv_sec * 1000LL + (int64_t)systemTime.tv_nsec / 1000000;
}
char *taosStrpTime(const char *buf, const char *fmt, struct tm *tm);
struct tm *taosLocalTime(const time_t *timep, struct tm *result);
struct tm *taosLocalTimeNolock(struct tm *result, const time_t *timep, int dst);
......
......@@ -281,7 +281,7 @@ typedef enum ELogicConditionType {
#define TSDB_DNODE_ROLE_VNODE 2
#define TSDB_MAX_REPLICA 5
#define TSDB_SYNC_LOG_BUFFER_SIZE 500
#define TSDB_SYNC_LOG_BUFFER_SIZE 512
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
......
......@@ -88,6 +88,60 @@ typedef struct SPeerState {
int64_t lastSendTime;
} SPeerState;
typedef struct SSyncReplInfo {
bool barrier;
bool acked;
int64_t timeMs;
int64_t term;
} SSyncReplInfo;
typedef struct SSyncLogReplMgr {
SSyncReplInfo states[TSDB_SYNC_LOG_BUFFER_SIZE];
int64_t startIndex;
int64_t matchIndex;
int64_t endIndex;
int64_t size;
bool restored;
int64_t peerStartTime;
int32_t retryBackoff;
int32_t peerId;
} SSyncLogReplMgr;
SSyncLogReplMgr* syncLogReplMgrCreate();
void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr);
// access
static FORCE_INLINE int64_t syncLogGetRetryBackoffTimeMs(SSyncLogReplMgr* pMgr) {
return (1 << pMgr->retryBackoff) * SYNC_LOG_REPL_RETRY_WAIT_MS;
}
static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) {
return TMIN(pMgr->retryBackoff + 1, SYNC_MAX_RETRY_BACKOFF);
}
static FORCE_INLINE int32_t syncLogReplMgrUpdateTerm(SSyncLogReplMgr* pMgr, SyncIndex index, SyncTerm term) {
if (index < pMgr->startIndex || index >= pMgr->endIndex) {
return -1;
}
pMgr->states[(index + pMgr->size) % pMgr->size].term = term;
return 0;
}
SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SRaftId* pDestId,
bool* pBarrier);
int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogBufferReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogResetLogReplMgr(SSyncLogReplMgr* pMgr);
int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
// others
bool syncLogReplMgrValidate(SSyncLogReplMgr* pMgr);
typedef struct SSyncLogBufEntry {
SSyncRaftEntry* pItem;
SyncIndex prevLogIndex;
......@@ -115,14 +169,15 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
int64_t syncLogBufferLoad(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex);
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode);
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex);
SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf);
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commtIndex);
SyncAppendEntries* syncLogToAppendEntries(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index);
SyncAppendEntries* syncLogToAppendEntries(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm);
// private
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf);
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex);
int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index);
int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm);
void syncIndexMgrSetIndex(SSyncIndexMgr* pSyncIndexMgr, const SRaftId* pRaftId, SyncIndex index);
bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index);
......@@ -225,11 +280,14 @@ typedef struct SSyncNode {
SSyncRespMgr* pSyncRespMgr;
// restore state
_Atomic bool restoreFinish;
bool restoreFinish;
// SSnapshot* pSnapshot;
SSyncSnapshotSender* senders[TSDB_MAX_REPLICA];
SSyncSnapshotReceiver* pNewNodeReceiver;
// log replication mgr
SSyncLogReplMgr* logReplMgrs[TSDB_MAX_REPLICA];
SPeerState peerStates[TSDB_MAX_REPLICA];
// is config changing
......@@ -309,6 +367,9 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId);
void syncNodeVoteForSelf(SSyncNode* pSyncNode);
// log replication
SSyncLogReplMgr* syncNodeGetLogReplMgr(SSyncNode* pNode, SRaftId* pDestId);
// snapshot --------------
bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode);
......
......@@ -18,6 +18,7 @@
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncReplication.h"
#include "syncSnapshot.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
......@@ -318,16 +319,17 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
ASSERT(pNode->pFsm != NULL && "pFsm not registered");
ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered");
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
SSnapshot snapshot;
if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) {
sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr());
goto _err;
}
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
SyncIndex commitIndex = snapshot.lastApplyIndex;
SyncTerm commitTerm = snapshot.lastApplyTerm;
SyncIndex toIndex = TMAX(lastVer, commitIndex);
ASSERT(lastVer >= commitIndex);
// update match index
pBuf->commitIndex = commitIndex;
......@@ -392,7 +394,7 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
}
// update startIndex
pBuf->startIndex = index;
pBuf->startIndex = takeDummy ? index : index + 1;
// validate
syncLogBufferValidate(pBuf);
......@@ -491,18 +493,44 @@ int32_t syncLogBufferRollbackMatchIndex(SSyncLogBuffer* pBuf, SSyncNode* pNode,
return 0;
}
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
SyncIndex index = pBuf->matchIndex;
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
ASSERT(pEntry != NULL);
return pEntry->term;
}
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
taosThreadMutexLock(&pBuf->mutex);
syncLogBufferValidate(pBuf);
int32_t ret = 0;
int32_t ret = -1;
SyncIndex index = pEntry->index;
SyncIndex prevIndex = pEntry->index - 1;
if (index <= pBuf->commitIndex || index - pBuf->startIndex > pBuf->size) {
sInfo("vgId:%d, cannot accept index:%" PRId64 " into log buffer. start index: %" PRId64 ", commit index: %" PRId64
", end index:%" PRId64 ")",
pNode->vgId, index, pBuf->startIndex, pBuf->commitIndex, pBuf->endIndex);
ret = (index <= pBuf->commitIndex) ? 0 : -1;
SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf);
if (index <= pBuf->commitIndex) {
sInfo("vgId:%d, raft entry already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64
" %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
pBuf->endIndex);
ret = 0;
goto _out;
}
if (index - pBuf->startIndex >= pBuf->size) {
sInfo("vgId:%d, raft entry out of buffer capacity. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64
" %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
pBuf->endIndex);
goto _out;
}
if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) {
sInfo("vgId:%d, not ready to accept raft entry (i.e. across barrier). index: %" PRId64 ", term: %" PRId64
": prevterm: %" PRId64 " /= lastmatch: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64
", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
pBuf->matchIndex, pBuf->endIndex);
goto _out;
}
......@@ -511,14 +539,16 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
if (pExist != NULL) {
ASSERT(pEntry->index == pExist->index);
if (pEntry->term > pExist->term) {
if (pEntry->term != pExist->term) {
(void)syncLogBufferRollback(pBuf, index);
} else {
sInfo("vgId:%d, %s raft entry received. index:%" PRId64 ", term: %" PRId64 "", pNode->vgId,
((pEntry->term < pExist->term) ? "stale" : "duplicate"), pEntry->index, pEntry->term);
sInfo("vgId:%d, duplicate raft entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64
" %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
pBuf->endIndex);
SyncTerm existPrevTerm = pBuf->entries[index % pBuf->size].prevLogTerm;
ASSERT(pEntry->term < pExist->term || (pEntry->term == pExist->term && prevTerm == existPrevTerm));
ret = (pEntry->term < pExist->term) ? 0 : -1;
ASSERT(pEntry->term == pExist->term && prevTerm == existPrevTerm);
ret = 0;
goto _out;
}
}
......@@ -531,6 +561,9 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
// update end index
pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);
// success
ret = 0;
_out:
syncEntryDestroy(pEntry);
syncLogBufferValidate(pBuf);
......@@ -550,6 +583,7 @@ SSyncRaftEntry* syncLogAppendEntriesToRaftEntry(const SyncAppendEntries* pMsg) {
}
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
ASSERT(pEntry->index >= 0);
SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) {
sError("failed to truncate log store since %s. from index:%" PRId64 "", terrstr(), pEntry->index);
......@@ -563,6 +597,9 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
pEntry->term);
return -1;
}
lastVer = pLogStore->syncLogLastIndex(pLogStore);
ASSERT(pEntry->index == lastVer);
return 0;
}
......@@ -607,10 +644,14 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
goto _out;
}
// increase match index
pBuf->matchIndex = index;
sInfo("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64,
pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
// replicate on demand
if (pNode->state == TAOS_SYNC_STATE_LEADER && pNode->replicaNum > 1) {
(void)syncLogBufferReplicate(pBuf, pNode, index);
}
(void)syncNodeReplicate(pNode);
// persist
if (syncLogStorePersist(pLogStore, pEntry) < 0) {
......@@ -618,16 +659,15 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
terrstr(), pEntry->index);
goto _out;
}
// increment
pBuf->matchIndex = index;
matchIndex = pBuf->matchIndex;
ASSERT(pEntry->index == pBuf->matchIndex);
// update my match index
matchIndex = pBuf->matchIndex;
syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex);
} // end of while
_out:
pBuf->matchIndex = matchIndex;
syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex);
return matchIndex;
......@@ -659,7 +699,7 @@ int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
ASSERT(pBuf->commitIndex <= pBuf->matchIndex);
ASSERT(pBuf->matchIndex < pBuf->endIndex);
ASSERT(pBuf->endIndex - pBuf->startIndex <= pBuf->size);
for (SyncIndex index = pBuf->commitIndex; index <= pBuf->matchIndex; index++) {
for (SyncIndex index = pBuf->startIndex; index <= pBuf->matchIndex; index++) {
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
ASSERT(pEntry != NULL);
}
......@@ -694,20 +734,11 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
// execute in fsm
for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) {
// get a log entry
if (index >= pBuf->startIndex) {
inBuf = true;
pEntry = pBuf->entries[index % pBuf->size].pItem;
} else {
inBuf = false;
if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
ret = -1;
goto _out;
}
pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
if (pEntry == NULL) {
goto _out;
}
ASSERT(pEntry != NULL);
// execute it
if (!syncUtilUserCommit(pEntry->originalRpcType)) {
sInfo("vgId:%d, non-user msg in raft log entry. index: %" PRId64 ", term:%" PRId64 "", vgId, pEntry->index,
......@@ -738,8 +769,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
}
// recycle
// TODO: with a grace period of one third of free space before commitIndex in ring buffer
SyncIndex until = pBuf->commitIndex;
SyncIndex used = pBuf->endIndex - pBuf->startIndex;
SyncIndex until = pBuf->commitIndex - (pBuf->size - used) / 2;
for (SyncIndex index = pBuf->startIndex; index < until; index++) {
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
ASSERT(pEntry != NULL);
......@@ -796,9 +827,6 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
syncNodeStepDown(ths, pMsg->term);
syncNodeResetElectTimer(ths);
// update commit index
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
if (pMsg->dataLen < (int32_t)sizeof(SSyncRaftEntry)) {
sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d",
ths->vgId, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
......@@ -825,15 +853,17 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
// accept
if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) {
sWarn("vgId:%d, failed to accept raft entry into log buffer. index:%" PRId64 ", term:%" PRId64, ths->vgId,
pEntry->index, pEntry->term);
goto _SEND_RESPONSE;
}
pReply->success = true;
_SEND_RESPONSE:
// update match index
pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths);
bool matched = (pReply->matchIndex >= pReply->lastSendIndex);
pReply->success = matched;
if (matched) {
// update commit index only after matching
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
}
// ack, i.e. send response
SRpcMsg rpcMsg;
......@@ -841,7 +871,7 @@ _SEND_RESPONSE:
(void)syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
// commit index, i.e. leader notice me
if (syncLogBufferCommit(ths->pLogBuf, ths, pMsg->commitIndex) < 0) {
if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
sError("vgId:%d, failed to commit raft fsm log since %s.", ths->vgId, terrstr());
goto _out;
}
......
......@@ -85,11 +85,11 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync
}
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) {
ths->commitIndex = TMAX(commitIndex, ths->commitIndex);
SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
commitIndex = TMIN(ths->commitIndex, lastVer);
ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, commitIndex);
return commitIndex;
commitIndex = TMAX(commitIndex, ths->commitIndex);
ths->commitIndex = TMIN(commitIndex, lastVer);
ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex);
return ths->commitIndex;
}
int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
......@@ -102,50 +102,77 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
return ths->commitIndex;
}
int32_t syncLogBufferCatchingUpReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex fromIndex, SRaftId destId) {
taosThreadMutexLock(&pBuf->mutex);
SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf) {
SSyncRaftEntry* pEntry = NULL;
if (index >= pBuf->endIndex) {
return NULL;
}
if (index > pBuf->startIndex) { // startIndex might be dummy
*pInBuf = true;
pEntry = pBuf->entries[index % pBuf->size].pItem;
} else {
*pInBuf = false;
if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, &pEntry) < 0) {
sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
}
}
return pEntry;
}
bool syncLogReplMgrValidate(SSyncLogReplMgr* pMgr) {
ASSERT(pMgr->startIndex <= pMgr->endIndex);
for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
ASSERT(pMgr->states[(index + pMgr->size) % pMgr->size].barrier == false || index + 1 == pMgr->endIndex);
}
return true;
}
static FORCE_INLINE bool syncLogIsReplicationBarrier(SSyncRaftEntry* pEntry) {
return pEntry->originalRpcType == TDMT_SYNC_NOOP;
}
int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SRaftId* pDestId,
bool* pBarrier) {
SSyncRaftEntry* pEntry = NULL;
SyncAppendEntries* pMsgOut = NULL;
SyncIndex index = fromIndex;
bool inBuf = false;
int32_t ret = -1;
SyncTerm prevLogTerm = -1;
SSyncLogBuffer* pBuf = pNode->pLogBuf;
if (pNode->state != TAOS_SYNC_STATE_LEADER || pNode->replicaNum <= 1) {
sInfo("vgId:%d, replicate one msg index: %" PRId64 " to dest: 0x%016" PRIx64, pNode->vgId, index, pDestId->addr);
pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
if (pEntry == NULL) {
sError("vgId:%d, failed to get raft entry for index: %" PRId64 "", pNode->vgId, index);
goto _out;
}
*pBarrier = syncLogIsReplicationBarrier(pEntry);
if (index < pBuf->startIndex) {
sError("vgId:%d, (not implemented yet) replication fromIndex: %" PRId64
" that is less than pBuf->startIndex: %" PRId64 ". destId: 0x%016" PRId64 "",
pNode->vgId, fromIndex, pBuf->startIndex, destId.addr);
prevLogTerm = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index);
if (prevLogTerm < 0 && terrno != TSDB_CODE_SUCCESS) {
sError("vgId:%d, failed to get prev log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), index);
goto _out;
}
(void)syncLogReplMgrUpdateTerm(pMgr, pEntry->index, pEntry->term);
if (index > pBuf->matchIndex) {
pMsgOut = syncLogToAppendEntries(pNode, pEntry, prevLogTerm);
if (pMsgOut == NULL) {
sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index);
goto _out;
}
do {
pMsgOut = syncLogToAppendEntries(pBuf, pNode, index);
if (pMsgOut == NULL) {
sError("vgId:%d, failed to assembly append entries msg since %s. index: %" PRId64 "", pNode->vgId, terrstr(),
index);
goto _out;
}
if (syncNodeSendAppendEntries(pNode, &destId, pMsgOut) < 0) {
sWarn("vgId:%d, failed to send append entries msg since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "",
pNode->vgId, terrstr(), index, destId.addr);
goto _out;
}
index += 1;
syncAppendEntriesDestroy(pMsgOut);
pMsgOut = NULL;
} while (false && index <= pBuf->commitIndex);
(void)syncNodeSendAppendEntries(pNode, pDestId, pMsgOut);
ret = 0;
_out:
syncAppendEntriesDestroy(pMsgOut);
pMsgOut = NULL;
taosThreadMutexUnlock(&pBuf->mutex);
return 0;
if (!inBuf) {
syncEntryDestroy(pEntry);
pEntry = NULL;
}
return ret;
}
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
......@@ -185,23 +212,15 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMs
SyncIndex indexLikely = TMIN(pMsg->matchIndex, ths->pLogBuf->matchIndex);
SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely);
(void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex);
} else {
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
if (nextIndex > SYNC_INDEX_BEGIN) {
--nextIndex;
}
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
}
// send next append entries
SPeerState* pState = syncNodeGetPeerState(ths, &(pMsg->srcId));
ASSERT(pState != NULL);
if (pMsg->lastSendIndex == pState->lastSendIndex) {
syncNodeReplicateOne(ths, &(pMsg->srcId));
// replicate log
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
ASSERT(pMgr != NULL);
if (pMgr != NULL) {
(void)syncLogReplMgrProcessReply(pMgr, ths, pMsg);
}
}
return 0;
}
......
......@@ -82,6 +82,15 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId,
index);
}
SSyncLogReplMgr *syncNodeGetLogReplMgr(SSyncNode *pNode, SRaftId *pDestId) {
for (int i = 0; i < pNode->replicaNum; i++) {
if (syncUtilSameId(&(pNode->replicasId[i]), pDestId)) {
return pNode->logReplMgrs[i];
}
}
return NULL;
}
SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) {
if (pSyncIndexMgr == NULL) {
return SYNC_INDEX_INVALID;
......
......@@ -1103,6 +1103,252 @@ int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
return ret;
}
int32_t syncLogResetLogReplMgr(SSyncLogReplMgr* pMgr) {
ASSERT(pMgr->startIndex >= 0);
for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
}
pMgr->startIndex = 0;
pMgr->matchIndex = 0;
pMgr->endIndex = 0;
pMgr->restored = false;
pMgr->retryBackoff = 0;
return 0;
}
int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
if (pMgr->endIndex <= pMgr->startIndex) {
return 0;
}
int32_t ret = -1;
bool retried = false;
int64_t retryWaitMs = syncLogGetRetryBackoffTimeMs(pMgr);
for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
int64_t pos = index % pMgr->size;
ASSERT(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex));
if (pMgr->states[pos].acked) {
continue;
}
int64_t nowMs = taosGetMonoTimestampMs();
if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) {
break;
}
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false;
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, pDestId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr);
goto _out;
}
ASSERT(barrier == pMgr->states[pos].barrier);
pMgr->states[pos].timeMs = nowMs;
pMgr->states[pos].acked = false;
retried = true;
}
ret = 0;
_out:
if (retried) {
pMgr->retryBackoff = syncLogGetNextRetryBackoff(pMgr);
}
return ret;
}
int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode,
SyncAppendEntriesReply* pMsg) {
SRaftId destId = pMsg->srcId;
ASSERT(pMgr->restored == false);
if (pMgr->endIndex == 0) {
ASSERT(pMgr->startIndex == 0);
ASSERT(pMgr->matchIndex == 0);
if (pMsg->matchIndex < 0) {
pMgr->restored = true;
return 0;
}
} else {
if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) {
syncLogReplMgrRetryOnNeed(pMgr, pNode);
return 0;
}
pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
if (pMsg->matchIndex == pMsg->lastSendIndex) {
pMgr->restored = true;
return 0;
}
(void)syncLogResetLogReplMgr(pMgr);
}
SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);
bool barrier = false;
ASSERT(index >= 0);
// send match index
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &destId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, destId.addr);
return -1;
}
int64_t nowMs = taosGetMonoTimestampMs();
pMgr->states[index % pMgr->size].barrier = barrier;
pMgr->states[index % pMgr->size].timeMs = nowMs;
pMgr->states[index % pMgr->size].acked = false;
pMgr->matchIndex = index;
pMgr->startIndex = index;
pMgr->endIndex = index + 1;
return 0;
}
int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
taosThreadMutexLock(&pBuf->mutex);
if (pMsg->startTime != pMgr->peerStartTime) {
syncLogResetLogReplMgr(pMgr);
pMgr->peerStartTime = pMsg->startTime;
}
if (pMgr->restored) {
(void)syncLogReplMgrProcessReplyInNormalMode(pMgr, pNode, pMsg);
} else {
(void)syncLogReplMgrProcessReplyInRecoveryMode(pMgr, pNode, pMsg);
}
taosThreadMutexUnlock(&pBuf->mutex);
return 0;
}
int32_t syncLogBufferReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
if (pMgr->restored) {
(void)syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode);
} else {
(void)syncLogReplMgrReplicateProbeOnce(pMgr, pNode);
}
return 0;
}
int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
ASSERT(!pMgr->restored);
SyncIndex index = pNode->pLogBuf->matchIndex;
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false;
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, pDestId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr);
return -1;
}
SSyncLogBuffer* pBuf = pNode->pLogBuf;
sInfo("vgId:%d, attempted to probe the %d'th peer. pMgr(restored:%d): [%" PRId64 " %" PRId64 ", %" PRId64
"), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex,
pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0;
}
int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
ASSERT(pMgr->restored);
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
int32_t batchSize = TMAX(1, pMgr->size / 10);
int32_t count = 0;
for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
if (batchSize < count++ || pMgr->startIndex + pMgr->size <= index) {
break;
}
if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) {
break;
}
int64_t pos = index % pMgr->size;
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false;
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, pDestId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr);
return -1;
}
pMgr->states[pos].barrier = barrier;
pMgr->states[pos].timeMs = taosGetMonoTimestampMs();
pMgr->states[pos].acked = false;
pMgr->endIndex = index + 1;
if (barrier) {
break;
}
}
SSyncLogBuffer* pBuf = pNode->pLogBuf;
sInfo("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(restored:%d): [%" PRId64 " %" PRId64
", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
syncLogReplMgrRetryOnNeed(pMgr, pNode);
return 0;
}
int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
ASSERT(pMgr->restored == true);
if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true;
pMgr->matchIndex = TMAX(pMgr->matchIndex, pMsg->matchIndex);
for (SyncIndex index = pMgr->startIndex; index < pMgr->matchIndex; index++) {
memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
}
pMgr->startIndex = pMgr->matchIndex;
}
return syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode);
}
SSyncLogReplMgr* syncLogReplMgrCreate() {
SSyncLogReplMgr* pMgr = taosMemoryCalloc(1, sizeof(SSyncLogReplMgr));
if (pMgr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pMgr->size = sizeof(pMgr->states) / sizeof(pMgr->states[0]);
ASSERT(pMgr->size == TSDB_SYNC_LOG_BUFFER_SIZE);
return pMgr;
_err:
taosMemoryFree(pMgr);
return NULL;
}
void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr) {
if (pMgr == NULL) {
return;
}
(void)taosMemoryFree(pMgr);
return;
}
int32_t syncNodeLogReplMgrInit(SSyncNode* pNode) {
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
ASSERT(pNode->logReplMgrs[i] == NULL);
pNode->logReplMgrs[i] = syncLogReplMgrCreate();
pNode->logReplMgrs[i]->peerId = i;
ASSERT(pNode->logReplMgrs[i] != NULL && "Out of memory.");
}
return 0;
}
void syncNodeLogReplMgrDestroy(SSyncNode* pNode) {
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
syncLogReplMgrDestroy(pNode->logReplMgrs[i]);
pNode->logReplMgrs[i] = NULL;
}
}
SSyncLogBuffer* syncLogBufferCreate() {
SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer));
if (pBuf == NULL) {
......@@ -1397,9 +1643,13 @@ SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) {
// is config changing
pSyncNode->changing = false;
// replication mgr
syncNodeLogReplMgrInit(pSyncNode);
// peer state
syncNodePeerStateInit(pSyncNode);
//
// min match index
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
......@@ -1532,6 +1782,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
ret = raftStoreClose(pSyncNode->pRaftStore);
ASSERT(ret == 0);
syncNodeLogReplMgrDestroy(pSyncNode);
syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
pSyncNode->pSyncRespMgr = NULL;
voteGrantedDestroy(pSyncNode->pVotesGranted);
......@@ -2477,6 +2728,11 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
pBuf->endIndex = pBuf->matchIndex + 1;
// reset repl mgr
for (int i = 0; i < pNode->replicaNum; i++) {
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
syncLogResetLogReplMgr(pMgr);
}
taosThreadMutexUnlock(&pBuf->mutex);
return 0;
}
......@@ -2637,8 +2893,12 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode);
int32_t ret = syncNodeAppendNoop(pSyncNode);
ASSERT(ret == 0);
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
ASSERT(lastIndex >= 0);
sInfo("vgId:%d, become leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64 "",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);
}
void syncNodeCandidate2LeaderOld(SSyncNode* pSyncNode) {
......@@ -2671,22 +2931,33 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become candidate from follower. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);
syncNodeEventLog(pSyncNode, "follower to candidate");
// syncNodeEventLog(pSyncNode, "follower to candidate");
}
void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
syncNodeBecomeFollower(pSyncNode, "leader to follower");
syncNodeEventLog(pSyncNode, "leader to follower");
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become follower from leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);
// syncNodeEventLog(pSyncNode, "leader to follower");
}
void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
syncNodeBecomeFollower(pSyncNode, "candidate to follower");
syncNodeEventLog(pSyncNode, "candidate to follower");
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become follower from candidate. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64,
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, lastIndex);
// syncNodeEventLog(pSyncNode, "candidate to follower");
}
// raft vote --------------
......@@ -3109,6 +3380,11 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
// proceed match index, with replicating on needed
SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths);
sInfo("vgId:%d, append raft log index: %" PRId64 ", term: %" PRId64 " log buffer: [%" PRId64 " %" PRId64 " %" PRId64
", %" PRId64 ")",
ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
// multi replica
if (ths->replicaNum > 1) {
return 0;
......@@ -3135,7 +3411,8 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
return -1;
}
return syncNodeAppend(ths, pEntry);
int32_t ret = syncNodeAppend(ths, pEntry);
return 0;
}
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
......@@ -3264,7 +3541,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
syncLogBufferValidate(pBuf);
SyncIndex index = pEntry->index;
if (index - pBuf->startIndex > pBuf->size) {
if (index - pBuf->startIndex >= pBuf->size) {
sError("vgId:%d, failed to append due to log buffer full. index:%" PRId64 "", pNode->vgId, index);
goto _out;
}
......@@ -3294,30 +3571,57 @@ _out:
return -1;
}
SyncTerm syncLogBufferGetTerm(SSyncLogBuffer* pBuf, SyncIndex index) {
ASSERT(pBuf->startIndex <= index && index < pBuf->endIndex);
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
ASSERT(pEntry != NULL);
return pEntry->term;
}
SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
SSyncRaftEntry* pEntry = NULL;
SyncIndex prevIndex = index - 1;
SyncTerm prevLogTerm = -1;
terrno = TSDB_CODE_SUCCESS;
SyncAppendEntries* syncLogToAppendEntries(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index) {
SyncAppendEntries* pMsg = NULL;
if (prevIndex == -1) return 0;
if (index < pBuf->startIndex || index >= pBuf->endIndex) {
sError("vgId:%d, log entry (%" PRId64 ") out of range of log buffer [%" PRId64 ", %" PRId64 ").", pNode->vgId,
index, pBuf->startIndex, pBuf->endIndex);
return pMsg;
if (index - 1 > pBuf->matchIndex) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem;
if (pEntry == NULL) {
sError("vgId:%d, log entry (%" PRId64 ") not exist in log buffer [%" PRId64 ", %" PRId64 ").", pNode->vgId, index,
pBuf->startIndex, pBuf->endIndex);
return pMsg;
ASSERT(index - 1 == prevIndex);
if (index - 1 >= pBuf->startIndex) {
pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
ASSERT(pEntry != NULL && "no log entry found");
prevLogTerm = pBuf->entries[(index + pBuf->size) % pBuf->size].prevLogTerm;
return prevLogTerm;
}
if (pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
ASSERT(timeMs != 0 && "no log entry found");
prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
return prevLogTerm;
}
SSnapshot snapshot;
if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) == 0 && prevIndex == snapshot.lastApplyIndex) {
return snapshot.lastApplyTerm;
}
if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry) == 0) {
prevLogTerm = pEntry->term;
syncEntryDestroy(pEntry);
pEntry = NULL;
return prevLogTerm;
}
sError("vgId:%d, failed to get log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), prevIndex);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
SyncAppendEntries* syncLogToAppendEntries(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm) {
SyncAppendEntries* pMsg = NULL;
uint32_t datalen = pEntry->bytes;
pMsg = syncAppendEntriesBuild(datalen, pNode->vgId);
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -3326,8 +3630,8 @@ SyncAppendEntries* syncLogToAppendEntries(SSyncLogBuffer* pBuf, SSyncNode* pNode
(void)memcpy(pMsg->data, pEntry, datalen);
pMsg->prevLogIndex = index - 1;
pMsg->prevLogTerm = syncLogBufferGetTerm(pBuf, pMsg->prevLogIndex);
pMsg->prevLogIndex = pEntry->index - 1;
pMsg->prevLogTerm = prevLogTerm;
pMsg->vgId = pNode->vgId;
pMsg->srcId = pNode->myRaftId;
pMsg->term = pNode->pRaftStore->currentTerm;
......@@ -3345,10 +3649,10 @@ void syncLogReplicateAppendEntries(SSyncNode* pNode, SyncAppendEntries* pMsg) {
}
}
int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index) {
SyncAppendEntries* pMsgOut = syncLogToAppendEntries(pNode->pLogBuf, pNode, index);
int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm) {
SyncAppendEntries* pMsgOut = syncLogToAppendEntries(pNode, pEntry, prevLogTerm);
if (pMsgOut == NULL) {
sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index);
sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, pEntry->index);
goto _err;
}
......
......@@ -136,7 +136,21 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) {
return 0;
}
int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
int32_t syncNodeReplicate(SSyncNode* pNode) {
if (pNode->state != TAOS_SYNC_STATE_LEADER || pNode->replicaNum == 1) {
return -1;
}
for (int32_t i = 0; i < pNode->replicaNum; i++) {
if (syncUtilSameId(&pNode->replicasId[i], &pNode->myRaftId)) {
continue;
}
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
(void)syncLogBufferReplicateOnce(pMgr, pNode);
}
return 0;
}
int32_t syncNodeReplicateOld(SSyncNode* pSyncNode) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
return -1;
}
......@@ -159,6 +173,17 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
}
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, SRaftId* destRaftId, SyncAppendEntries* pMsg) {
sInfo("vgId:%d, send append entries msg index: %" PRId64 " to dest: 0x%016" PRId64, pSyncNode->vgId,
pMsg->prevLogIndex + 1, destRaftId->addr);
int32_t ret = 0;
pMsg->destId = *destRaftId;
SRpcMsg rpcMsg;
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
return 0;
}
int32_t syncNodeSendAppendEntriesOld(SSyncNode* pSyncNode, SRaftId* destRaftId, SyncAppendEntries* pMsg) {
int32_t ret = 0;
pMsg->destId = *destRaftId;
......
......@@ -23,6 +23,9 @@ static SMsgCb defaultMsgCb;
void tmsgSetDefault(const SMsgCb* msgcb) { defaultMsgCb = *msgcb; }
int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) {
if (msgcb == NULL) {
return -1;
}
int32_t code = (*msgcb->putToQueueFp)(msgcb->mgmt, qtype, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册