From b90ee796236f4ce551a36c93d3eaff109d81e8d3 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 21 Nov 2022 13:51:01 +0800 Subject: [PATCH] fix: use syncLogBufferCommit in syncNodeOnLocalCmd --- include/util/tdef.h | 2 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 4 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 7 ++++ source/dnode/vnode/src/vnd/vnodeSync.c | 8 ++-- source/libs/sync/src/syncAppendEntries.c | 6 +-- source/libs/sync/src/syncAppendEntriesReply.c | 4 +- source/libs/sync/src/syncCommit.c | 4 +- source/libs/sync/src/syncMain.c | 38 ++++++++++++++++--- source/libs/sync/src/syncPipeline.c | 26 +++++++------ source/libs/transport/src/transSvr.c | 1 + 10 files changed, 68 insertions(+), 32 deletions(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index ef0eca4db3..124d98e3dc 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -281,7 +281,7 @@ typedef enum ELogicConditionType { #define TSDB_DNODE_ROLE_VNODE 2 #define TSDB_MAX_REPLICA 5 -#define TSDB_SYNC_LOG_BUFFER_SIZE 1024 +#define TSDB_SYNC_LOG_BUFFER_SIZE 4096 #define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 65a46331aa..65fd266083 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -1258,7 +1258,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) } } } else { - ASSERT(0); + ASSERT(0 && "dup rows not allowed"); } if (pBDataW->nRow >= pCommitter->maxRow) { @@ -1679,4 +1679,4 @@ _exit: tsdbInfo("vgId:%d, tsdb rollback commit", TD_VID(pTsdb->pVnode)); } return code; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 5c8c166833..4abbeb61b4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -178,6 +178,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp return -1; } + if (version <= pVnode->state.applied) { + vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version, + pVnode->state.applied); + pRsp->info.handle = NULL; + return -1; + } + vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index c67ee41b12..6837dd1341 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -307,10 +307,10 @@ static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const S rpcMsg.info.conn.applyTerm = pMeta->term; const STraceId *trace = &pMsg->info.traceId; - vGInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 - ", weak:%d, code:%d, state:%d %s, type:%s", - pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak, - pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType)); + vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 + ", weak:%d, code:%d, state:%d %s, type:%s", + pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak, + pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType)); tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); } else { diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index f2bdf6be70..9634f4ee26 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -195,9 +195,9 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { goto _IGNORE; } - sInfo("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64 - ", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 "", - pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex); + sTrace("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64 + ", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 "", + pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex); // accept if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) { diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index ad388e193a..32e424666b 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -64,8 +64,8 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ASSERT(pMsg->term == ths->pRaftStore->currentTerm); - sInfo("vgId:%d received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", - pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex); + sTrace("vgId:%d received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", + pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex); if (pMsg->success) { SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index fc7ea7cc30..5d4298552d 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -324,8 +324,8 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) { SyncIndex commitIndex = indexLikely; syncNodeUpdateCommitIndex(ths, commitIndex); - sInfo("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state, - ths->pRaftStore->currentTerm, commitIndex); + sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state, + ths->pRaftStore->currentTerm, commitIndex); } return ths->commitIndex; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c95a3aba62..8177f3b6db 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -595,7 +595,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { tstrncpy(pEp->fqdn, pSyncNode->pRaftCfg->cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN); pEp->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort; pEpSet->numOfEps++; - sInfo("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port); + sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port); } if (pEpSet->numOfEps > 0) { pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps; @@ -1028,6 +1028,12 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) { SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore); SyncIndex endIndex = pSyncNode->pLogBuf->endIndex; + if (lastVer != -1 && endIndex != lastVer + 1) { + terrno = TSDB_CODE_WAL_LOG_INCOMPLETE; + sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex: %" PRId64 ", lastVer: %" PRId64 "", + pSyncNode->vgId, terrstr(), endIndex - 1, lastVer); + return -1; + } ASSERT(endIndex == lastVer + 1); commitIndex = TMAX(pSyncNode->commitIndex, commitIndex); @@ -2141,10 +2147,10 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { // proceed match index, with replicating on needed SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths); - sInfo("vgId:%d, append raft entry. index: %" PRId64 ", term: %" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64 - ", %" PRId64 ")", - ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, - ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); + sDebug("vgId:%d, append raft entry. index: %" PRId64 ", term: %" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64 + ", %" PRId64 ")", + ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, + ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); // multi replica if (ths->replicaNum > 1) { @@ -2300,6 +2306,26 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncLocalCmd* pMsg = pRpcMsg->pCont; syncLogRecvLocalCmd(ths, pMsg, ""); + if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { + syncNodeStepDown(ths, pMsg->sdNewTerm); + + } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { + (void)syncNodeUpdateCommitIndex(ths, pMsg->fcIndex); + if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { + sError("vgId:%d, failed to commit raft log since %s. commit index: %" PRId64 "", ths->vgId, terrstr(), + ths->commitIndex); + } + } else { + sError("error local cmd"); + } + + return 0; +} + +int32_t syncNodeOnLocalCmdOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) { + SyncLocalCmd* pMsg = pRpcMsg->pCont; + syncLogRecvLocalCmd(ths, pMsg, ""); + if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { syncNodeStepDown(ths, pMsg->sdNewTerm); @@ -2535,11 +2561,11 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) { } bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) { - // return false; return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1); } int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { + ASSERT(false); if (beginIndex > endIndex) { return 0; } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 501efa8782..656b32bd7b 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -163,7 +163,6 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) { sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); - ASSERT(0); break; } @@ -334,7 +333,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) { SyncTerm prevLogTerm = pBufEntry->prevLogTerm; SSyncRaftEntry* pEntry = pBufEntry->pItem; if (pEntry == NULL) { - sDebug("vgId:%d, cannot proceed match index in log buffer. no raft entry at next pos of matchIndex:%" PRId64, + sTrace("vgId:%d, cannot proceed match index in log buffer. no raft entry at next pos of matchIndex:%" PRId64, pNode->vgId, pBuf->matchIndex); goto _out; } @@ -361,8 +360,8 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) { // increase match index pBuf->matchIndex = index; - sInfo("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64, - pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex); + sTrace("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64, + pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex); // replicate on demand (void)syncNodeReplicate(pNode); @@ -390,6 +389,10 @@ _out: int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) { ASSERT(pFsm->FpCommitCb != NULL && "No commit cb registered for the FSM"); + if ((pNode->replicaNum == 1) && pNode->restoreFinish && pNode->vgId != 1) { + return 0; + } + SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); @@ -439,7 +442,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm goto _out; } - sDebug("vgId:%d, log buffer info. role: %d, term: %" PRId64 ". start index:%" PRId64 ", commit index:%" PRId64 + sTrace("vgId:%d, log buffer info. role: %d, term: %" PRId64 ". start index:%" PRId64 ", commit index:%" PRId64 ", match index: %" PRId64 ", end index:%" PRId64 "", pNode->vgId, role, term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); @@ -470,7 +473,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } pBuf->commitIndex = index; - sDebug("vgId:%d, committed index: %" PRId64 ", term: %" PRId64 ", role: %d, current term: %" PRId64 "", pNode->vgId, + sTrace("vgId:%d, committed index: %" PRId64 ", term: %" PRId64 ", role: %d, current term: %" PRId64 "", pNode->vgId, pEntry->index, pEntry->term, role, term); if (!inBuf) { @@ -480,8 +483,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } // recycle - SyncIndex used = pBuf->endIndex - pBuf->startIndex; - SyncIndex until = pBuf->commitIndex - (pBuf->size - used) / 2; + SyncIndex until = pBuf->commitIndex - (pBuf->size >> 4); for (SyncIndex index = pBuf->startIndex; index < until; index++) { SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; ASSERT(pEntry != NULL); @@ -687,7 +689,7 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode } SSyncLogBuffer* pBuf = pNode->pLogBuf; - sDebug("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64 + sTrace("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64 ". pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pMgr->peerId, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, @@ -733,7 +735,7 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p } SSyncLogBuffer* pBuf = pNode->pLogBuf; - sDebug("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 + sTrace("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); @@ -920,8 +922,8 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn (void)syncNodeSendAppendEntries(pNode, pDestId, &msgOut); ret = 0; - sInfo("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64, - pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr); + sTrace("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64, + pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr); if (!inBuf) { syncEntryDestroy(pEntry); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 5f36d91023..ef59b54124 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1246,6 +1246,7 @@ _return2: tTrace("handle %p failed to send to release handle", exh); return -1; } + int transSendResponse(const STransMsg* msg) { if (msg->info.noResp) { rpcFreeCont(msg->pCont); -- GitLab