提交 363cbc89 编写于 作者: B Benguang Zhao

fix: update cbMeta rsp handle info in syncLogFsmExecute

上级 4526ff28
......@@ -415,7 +415,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm) {
walApplyVer(pVnode->pWal, pVnode->state.applied);
pVnode->restored = true;
vDebug("vgId:%d, sync restore finished", pVnode->config.vgId);
vInfo("vgId:%d, sync restore finished", pVnode->config.vgId);
}
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
......
......@@ -47,8 +47,13 @@ extern "C" {
//
void syncOneReplicaAdvance(SSyncNode* pSyncNode);
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode);
bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index);
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index);
bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index);
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex);
int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely);
#ifdef __cplusplus
}
......
......@@ -41,8 +41,28 @@ typedef struct SSyncLogReplMgr {
int32_t peerId;
} SSyncLogReplMgr;
typedef struct SSyncLogBufEntry {
SSyncRaftEntry* pItem;
SyncIndex prevLogIndex;
SyncTerm prevLogTerm;
} SSyncLogBufEntry;
typedef struct SSyncLogBuffer {
SSyncLogBufEntry entries[TSDB_SYNC_LOG_BUFFER_SIZE];
int64_t startIndex;
int64_t commitIndex;
int64_t matchIndex;
int64_t endIndex;
int64_t size;
TdThreadMutex mutex;
} SSyncLogBuffer;
// SSyncLogRepMgr
SSyncLogReplMgr* syncLogReplMgrCreate();
void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr);
int32_t syncLogReplMgrReset(SSyncLogReplMgr* pMgr);
int32_t syncNodeLogReplMgrInit(SSyncNode* pNode);
void syncNodeLogReplMgrDestroy(SSyncNode* pNode);
......@@ -69,35 +89,12 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn
int32_t syncLogBufferReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogResetLogReplMgr(SSyncLogReplMgr* pMgr);
int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg);
int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
void syncLogDestroyAppendEntries(SRpcMsg* pRpcMsg);
// others
bool syncLogReplMgrValidate(SSyncLogReplMgr* pMgr);
typedef struct SSyncLogBufEntry {
SSyncRaftEntry* pItem;
SyncIndex prevLogIndex;
SyncTerm prevLogTerm;
} SSyncLogBufEntry;
typedef struct SSyncLogBuffer {
SSyncLogBufEntry entries[TSDB_SYNC_LOG_BUFFER_SIZE];
int64_t startIndex;
int64_t commitIndex;
int64_t matchIndex;
int64_t endIndex;
int64_t size;
TdThreadMutex mutex;
} SSyncLogBuffer;
// SSyncLogBuffer
SSyncLogBuffer* syncLogBufferCreate();
void syncLogBufferDestroy(SSyncLogBuffer* pBuf);
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
......@@ -117,8 +114,6 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex);
int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm);
// others
bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index);
void syncIndexMgrSetIndex(SSyncIndexMgr* pSyncIndexMgr, const SRaftId* pRaftId, SyncIndex index);
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commtIndex);
int32_t syncLogToAppendEntries(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm, SRpcMsg* pRpcMsg);
......
......@@ -53,6 +53,9 @@ int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpc
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot);
int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -87,6 +87,8 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceive
int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg);
int32_t syncNodeOnSnapshotReply(SSyncNode *ths, const SRpcMsg *pMsg);
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode *pSyncNode, SyncIndex snapshotLastApplyIndex);
// start
#ifdef __cplusplus
......
......@@ -39,24 +39,6 @@
// /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
//
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) {
SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
commitIndex = TMAX(commitIndex, ths->commitIndex);
ths->commitIndex = TMIN(commitIndex, lastVer);
ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex);
return ths->commitIndex;
}
int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) {
SyncIndex commitIndex = indexLikely;
syncNodeUpdateCommitIndex(ths, commitIndex);
sInfo("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state,
ths->pRaftStore->currentTerm, commitIndex);
}
return ths->commitIndex;
}
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncAppendEntriesReply* pMsg = pRpcMsg->pCont;
int32_t ret = 0;
......
......@@ -311,3 +311,21 @@ bool syncAgree(SSyncNode* pNode, SyncIndex index) {
}
return false;
}
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) {
SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
commitIndex = TMAX(commitIndex, ths->commitIndex);
ths->commitIndex = TMIN(commitIndex, lastVer);
ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex);
return ths->commitIndex;
}
int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) {
SyncIndex commitIndex = indexLikely;
syncNodeUpdateCommitIndex(ths, commitIndex);
sInfo("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state,
ths->pRaftStore->currentTerm, commitIndex);
}
return ths->commitIndex;
}
......@@ -16,10 +16,13 @@
#define _DEFAULT_SOURCE
#include "syncLogBuffer.h"
#include "syncIndexMgr.h"
#include "syncInt.h"
#include "syncRaftEntry.h"
#include "syncRaftStore.h"
#include "syncReplication.h"
#include "syncRespMgr.h"
#include "syncSnapshot.h"
#include "syncUtil.h"
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
......@@ -384,7 +387,7 @@ _out:
return matchIndex;
}
int32_t syncLogFsmExecute(SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) {
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) {
ASSERT(pFsm->FpCommitCb != NULL && "No commit cb registered for the FSM");
SRpcMsg rpcMsg;
......@@ -392,7 +395,7 @@ int32_t syncLogFsmExecute(SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncR
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = -1;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = role;
......@@ -401,6 +404,7 @@ int32_t syncLogFsmExecute(SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncR
cbMeta.currentTerm = term;
cbMeta.flag = -1;
(void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
return 0;
}
......@@ -423,7 +427,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
ESyncState role = pNode->state;
SyncTerm term = pNode->pRaftStore->currentTerm;
SyncGroupId vgId = pNode->vgId;
int32_t ret = 0;
int32_t ret = -1;
int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex);
SSyncRaftEntry* pEntry = NULL;
bool inBuf = false;
......@@ -459,10 +463,9 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
continue;
}
if (syncLogFsmExecute(pFsm, role, term, pEntry) != 0) {
if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry) != 0) {
sError("vgId:%d, failed to execute raft entry in FSM. log index:%" PRId64 ", term:%" PRId64 "", vgId,
pEntry->index, pEntry->term);
ret = -1;
goto _out;
}
pBuf->commitIndex = index;
......@@ -487,6 +490,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
pBuf->startIndex = index + 1;
}
ret = 0;
_out:
// mark as restored if needed
if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex) {
......@@ -505,7 +509,7 @@ _out:
return ret;
}
int32_t syncLogResetLogReplMgr(SSyncLogReplMgr* pMgr) {
int32_t syncLogReplMgrReset(SSyncLogReplMgr* pMgr) {
ASSERT(pMgr->startIndex >= 0);
for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
memset(&pMgr->states[index % pMgr->size], 0, sizeof(pMgr->states[0]));
......@@ -601,7 +605,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
return 0;
}
(void)syncLogResetLogReplMgr(pMgr);
(void)syncLogReplMgrReset(pMgr);
}
// send match index
......@@ -633,7 +637,7 @@ int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pN
if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
sInfo("vgId:%d, reset sync log repl mgr in heartbeat. start time:%" PRId64 ", old start time:%" PRId64 "",
pNode->vgId, pMsg->startTime, pMgr->peerStartTime);
syncLogResetLogReplMgr(pMgr);
syncLogReplMgrReset(pMgr);
pMgr->peerStartTime = pMsg->startTime;
}
taosThreadMutexUnlock(&pBuf->mutex);
......@@ -647,7 +651,7 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync
sInfo("vgId:%d, reset sync log repl mgr in append entries reply. start time:%" PRId64 ", old start time:%" PRId64
"",
pNode->vgId, pMsg->startTime, pMgr->peerStartTime);
syncLogResetLogReplMgr(pMgr);
syncLogReplMgrReset(pMgr);
pMgr->peerStartTime = pMsg->startTime;
}
......@@ -861,7 +865,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
// reset repl mgr
for (int i = 0; i < pNode->replicaNum; i++) {
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
syncLogResetLogReplMgr(pMgr);
syncLogReplMgrReset(pMgr);
}
taosThreadMutexUnlock(&pBuf->mutex);
return 0;
......@@ -884,14 +888,6 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode,
return pEntry;
}
bool syncLogReplMgrValidate(SSyncLogReplMgr* pMgr) {
ASSERT(pMgr->startIndex <= pMgr->endIndex);
for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
ASSERT(pMgr->states[(index + pMgr->size) % pMgr->size].barrier == false || index + 1 == pMgr->endIndex);
}
return true;
}
int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
SRaftId* pDestId, bool* pBarrier) {
SSyncRaftEntry* pEntry = NULL;
......
......@@ -58,7 +58,6 @@ static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeade
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
static SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex);
int64_t syncOpen(SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
......
......@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "tmsgcb.h"
#include "taoserror.h"
#include "transLog.h"
#include "trpc.h"
static SMsgCb defaultMsgCb;
......@@ -23,9 +24,7 @@ static SMsgCb defaultMsgCb;
void tmsgSetDefault(const SMsgCb* msgcb) { defaultMsgCb = *msgcb; }
int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) {
if (msgcb == NULL) {
return -1;
}
ASSERT(msgcb != NULL);
int32_t code = (*msgcb->putToQueueFp)(msgcb->mgmt, qtype, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册