提交 b90ee796 编写于 作者: B Benguang Zhao

fix: use syncLogBufferCommit in syncNodeOnLocalCmd

上级 c9c05761
......@@ -281,7 +281,7 @@ typedef enum ELogicConditionType {
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
......@@ -1258,7 +1258,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
} else {
ASSERT(0 && "dup rows not allowed");
if (pBDataW->nRow >= pCommitter->maxRow) {
......@@ -1679,4 +1679,4 @@ _exit:
tsdbInfo("vgId:%d, tsdb rollback commit", TD_VID(pTsdb->pVnode));
return code;
\ No newline at end of file
......@@ -178,6 +178,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
return -1;
if (version <= pVnode->state.applied) {
vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version,
pRsp->info.handle = NULL;
return -1;
vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
......@@ -307,10 +307,10 @@ static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const S
rpcMsg.info.conn.applyTerm = pMeta->term;
const STraceId *trace = &pMsg->info.traceId;
vGInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
", weak:%d, code:%d, state:%d %s, type:%s",
pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak,
pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType));
vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
", weak:%d, code:%d, state:%d %s, type:%s",
pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak,
pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType));
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
......@@ -195,9 +195,9 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
goto _IGNORE;
sInfo("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64
", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 "",
pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex);
sTrace("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64
", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 "",
pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex);
// accept
if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) {
......@@ -64,8 +64,8 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
sInfo("vgId:%d received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "",
pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex);
sTrace("vgId:%d received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "",
pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex);
if (pMsg->success) {
SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
......@@ -324,8 +324,8 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) {
SyncIndex commitIndex = indexLikely;
syncNodeUpdateCommitIndex(ths, commitIndex);
sInfo("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state,
ths->pRaftStore->currentTerm, commitIndex);
sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state,
ths->pRaftStore->currentTerm, commitIndex);
return ths->commitIndex;
......@@ -595,7 +595,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
tstrncpy(pEp->fqdn, pSyncNode->pRaftCfg->cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
pEp->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
sInfo("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
if (pEpSet->numOfEps > 0) {
pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
......@@ -1028,6 +1028,12 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) {
SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
if (lastVer != -1 && endIndex != lastVer + 1) {
sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex: %" PRId64 ", lastVer: %" PRId64 "",
pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
return -1;
ASSERT(endIndex == lastVer + 1);
commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
......@@ -2141,10 +2147,10 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
// proceed match index, with replicating on needed
SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths);
sInfo("vgId:%d, append raft entry. index: %" PRId64 ", term: %" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
", %" PRId64 ")",
ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
sDebug("vgId:%d, append raft entry. index: %" PRId64 ", term: %" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
", %" PRId64 ")",
ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
// multi replica
if (ths->replicaNum > 1) {
......@@ -2300,6 +2306,26 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncLocalCmd* pMsg = pRpcMsg->pCont;
syncLogRecvLocalCmd(ths, pMsg, "");
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
syncNodeStepDown(ths, pMsg->sdNewTerm);
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
(void)syncNodeUpdateCommitIndex(ths, pMsg->fcIndex);
if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
sError("vgId:%d, failed to commit raft log since %s. commit index: %" PRId64 "", ths->vgId, terrstr(),
} else {
sError("error local cmd");
return 0;
int32_t syncNodeOnLocalCmdOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncLocalCmd* pMsg = pRpcMsg->pCont;
syncLogRecvLocalCmd(ths, pMsg, "");
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
syncNodeStepDown(ths, pMsg->sdNewTerm);
......@@ -2535,11 +2561,11 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
// return false;
return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
if (beginIndex > endIndex) {
return 0;
......@@ -163,7 +163,6 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
......@@ -334,7 +333,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
SyncTerm prevLogTerm = pBufEntry->prevLogTerm;
SSyncRaftEntry* pEntry = pBufEntry->pItem;
if (pEntry == NULL) {
sDebug("vgId:%d, cannot proceed match index in log buffer. no raft entry at next pos of matchIndex:%" PRId64,
sTrace("vgId:%d, cannot proceed match index in log buffer. no raft entry at next pos of matchIndex:%" PRId64,
pNode->vgId, pBuf->matchIndex);
goto _out;
......@@ -361,8 +360,8 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
// increase match index
pBuf->matchIndex = index;
sInfo("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64,
pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
sTrace("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64,
pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
// replicate on demand
......@@ -390,6 +389,10 @@ _out:
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) {
ASSERT(pFsm->FpCommitCb != NULL && "No commit cb registered for the FSM");
if ((pNode->replicaNum == 1) && pNode->restoreFinish && pNode->vgId != 1) {
return 0;
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
......@@ -439,7 +442,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
goto _out;
sDebug("vgId:%d, log buffer info. role: %d, term: %" PRId64 ". start index:%" PRId64 ", commit index:%" PRId64
sTrace("vgId:%d, log buffer info. role: %d, term: %" PRId64 ". start index:%" PRId64 ", commit index:%" PRId64
", match index: %" PRId64 ", end index:%" PRId64 "",
pNode->vgId, role, term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
......@@ -470,7 +473,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
pBuf->commitIndex = index;
sDebug("vgId:%d, committed index: %" PRId64 ", term: %" PRId64 ", role: %d, current term: %" PRId64 "", pNode->vgId,
sTrace("vgId:%d, committed index: %" PRId64 ", term: %" PRId64 ", role: %d, current term: %" PRId64 "", pNode->vgId,
pEntry->index, pEntry->term, role, term);
if (!inBuf) {
......@@ -480,8 +483,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
// recycle
SyncIndex used = pBuf->endIndex - pBuf->startIndex;
SyncIndex until = pBuf->commitIndex - (pBuf->size - used) / 2;
SyncIndex until = pBuf->commitIndex - (pBuf->size >> 4);
for (SyncIndex index = pBuf->startIndex; index < until; index++) {
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
ASSERT(pEntry != NULL);
......@@ -687,7 +689,7 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode
SSyncLogBuffer* pBuf = pNode->pLogBuf;
sDebug("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64
sTrace("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64
". pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
pNode->vgId, pMgr->peerId, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
......@@ -733,7 +735,7 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
SSyncLogBuffer* pBuf = pNode->pLogBuf;
sDebug("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
sTrace("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
"), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
......@@ -920,8 +922,8 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn
(void)syncNodeSendAppendEntries(pNode, pDestId, &msgOut);
ret = 0;
sInfo("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64,
pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr);
sTrace("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64,
pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr);
if (!inBuf) {
......@@ -1246,6 +1246,7 @@ _return2:
tTrace("handle %p failed to send to release handle", exh);
return -1;
int transSendResponse(const STransMsg* msg) {
if (msg->info.noResp) {
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册