未验证 提交 a4c92217 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #15426 from taosdata/feature/3.0_mhli

refactor(sync): add pre-commit interface
......@@ -206,13 +206,13 @@ static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool
}
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnode *pVnode = pInfo->ahandle;
int32_t vgId = pVnode->config.vgId;
int32_t code = 0;
SRpcMsg *pMsg = NULL;
int32_t arrayPos = 0;
SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg*));
bool *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
SVnode *pVnode = pInfo->ahandle;
int32_t vgId = pVnode->config.vgId;
int32_t code = 0;
SRpcMsg *pMsg = NULL;
int32_t arrayPos = 0;
SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *));
bool *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
for (int32_t msg = 0; msg < numOfMsgs; msg++) {
......@@ -506,7 +506,7 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
if (cbMeta.code == 0) {
if (cbMeta.code == 0 && cbMeta.isWeak == 0) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
......@@ -529,6 +529,23 @@ static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMet
vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
if (cbMeta.code == 0 && cbMeta.isWeak == 1) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
rpcMsg.info.conn.applyTerm = cbMeta.term;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
vError("vgId:%d, sync pre-commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync),
pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
}
}
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
......
......@@ -238,6 +238,7 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncInd
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag);
int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code);
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
......
......@@ -244,22 +244,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pAppendEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 2;
cbMeta.state = ths->state;
cbMeta.seqNum = pAppendEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
syncNodePreCommit(ths, pAppendEntry, 0);
}
// free memory
......@@ -280,22 +265,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pAppendEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 3;
cbMeta.state = ths->state;
cbMeta.seqNum = pAppendEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
syncNodePreCommit(ths, pAppendEntry, 0);
// free memory
syncEntryDestory(pAppendEntry);
......@@ -440,7 +410,7 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
return code;
}
static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
......@@ -456,7 +426,7 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 2;
cbMeta.code = code;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
......@@ -594,7 +564,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
return -1;
}
code = syncNodePreCommit(ths, pAppendEntry);
code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0);
// syncEntryDestory(pAppendEntry);
......@@ -715,7 +685,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
return -1;
}
code = syncNodePreCommit(ths, pAppendEntry);
code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0);
// syncEntryDestory(pAppendEntry);
......@@ -919,7 +889,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
}
// pre commit
code = syncNodePreCommit(ths, pAppendEntry);
code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0);
// update match index
......@@ -1032,7 +1002,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
}
// pre commit
code = syncNodePreCommit(ths, pAppendEntry);
code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0);
syncEntryDestory(pAppendEntry);
......
......@@ -67,11 +67,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
for (SyncIndex index = syncNodeGetLastIndex(pSyncNode); index > pSyncNode->commitIndex; --index) {
bool agree = syncAgree(pSyncNode, index);
if (gRaftDetailLog) {
sTrace("syncMaybeAdvanceCommitIndex syncAgree:%d, index:%" PRId64 ", pSyncNode->commitIndex:%" PRId64, agree,
index, pSyncNode->commitIndex);
}
if (agree) {
// term
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index);
......@@ -82,20 +77,15 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
// update commit index
newCommitIndex = index;
if (gRaftDetailLog) {
sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%" PRId64
" commit, pSyncNode->commitIndex:%" PRId64,
newCommitIndex, pSyncNode->commitIndex);
}
syncEntryDestory(pEntry);
break;
} else {
if (gRaftDetailLog) {
sTrace("syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%" PRIu64
", pSyncNode->pRaftStore->currentTerm:%" PRIu64,
pEntry->term, pSyncNode->pRaftStore->currentTerm);
}
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "can not commit due to term not equal, index:%ld, term:%lu", pEntry->index,
pEntry->term);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
}
syncEntryDestory(pEntry);
......@@ -107,10 +97,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex beginIndex = pSyncNode->commitIndex + 1;
SyncIndex endIndex = newCommitIndex;
if (gRaftDetailLog) {
sTrace("syncMaybeAdvanceCommitIndex sync commit %" PRId64, newCommitIndex);
}
// update commit index
pSyncNode->commitIndex = newCommitIndex;
......
......@@ -2504,22 +2504,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
}
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
syncNodePreCommit(ths, pEntry, 0);
// if only myself, maybe commit right now
if (ths->replicaNum == 1) {
......@@ -2528,22 +2513,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
} else {
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 1;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
syncNodePreCommit(ths, pEntry, 0);
}
if (pRetIndex != NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册