diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 7ab19c5c6ecb39bba5de29ef6f5dda013f2211c3..feecbc6d3bd6c416c9cc436213b344f5c099abe1 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -307,10 +307,10 @@ static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const S rpcMsg.info.conn.applyTerm = pMeta->term; const STraceId *trace = &pMsg->info.traceId; - vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 - ", weak:%d, code:%d, state:%d %s, type:%s", - pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak, - pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType)); + vGInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 + ", weak:%d, code:%d, state:%d %s, type:%s", + pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak, + pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType)); tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); } else { @@ -558,12 +558,12 @@ bool vnodeIsLeader(SVnode *pVnode) { } else { terrno = TSDB_CODE_APP_NOT_READY; } - vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncStr(state.state), state.restored); + vInfo("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncStr(state.state), state.restored); return false; } if (!pVnode->restored) { - vDebug("vgId:%d, vnode not restored", pVnode->config.vgId); + vInfo("vgId:%d, vnode not restored", pVnode->config.vgId); terrno = TSDB_CODE_APP_NOT_READY; return false; } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 0d892cdd1dc9eea85fb7f07c73ae8926832569e2..8197119b87d0ca59f8926a0b960924fd0c5b24de 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -245,19 +245,19 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf); if (index <= pBuf->commitIndex) { - sDebug("vgId:%d, raft entry already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 - " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, - pBuf->endIndex); + sInfo("vgId:%d, raft entry already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 + " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, + pBuf->endIndex); ret = 0; goto _out; } if (index - pBuf->startIndex >= pBuf->size) { - sDebug("vgId:%d, raft entry out of buffer capacity. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 - " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, - pBuf->endIndex); + sInfo("vgId:%d, raft entry out of buffer capacity. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 + " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, + pBuf->endIndex); goto _out; } @@ -382,8 +382,8 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) { // increase match index pBuf->matchIndex = index; - sDebug("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64, - pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex); + sInfo("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64, + pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex); // replicate on demand (void)syncNodeReplicate(pNode); @@ -586,9 +586,9 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { goto _IGNORE; } - sDebug("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64 - ", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 "", - pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex); + sInfo("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64 + ", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 "", + pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex); // accept if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) { diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 3a7513f276b27f259d99b2013091abcfc064583a..3295f030164b0627896a6cbc32034e17b4b454db 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -50,8 +50,8 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) { SyncIndex commitIndex = indexLikely; syncNodeUpdateCommitIndex(ths, commitIndex); - sDebug("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state, - ths->pRaftStore->currentTerm, commitIndex); + sInfo("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state, + ths->pRaftStore->currentTerm, commitIndex); } return ths->commitIndex; } @@ -113,8 +113,8 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn (void)syncNodeSendAppendEntries(pNode, pDestId, &msgOut); ret = 0; - sDebug("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64, - pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr); + sInfo("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64, + pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr); if (!inBuf) { syncEntryDestroy(pEntry); @@ -157,8 +157,8 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ASSERT(pMsg->term == ths->pRaftStore->currentTerm); - sDebug("vgId:%d received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", - pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex); + sInfo("vgId:%d received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", + pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex); if (pMsg->success) { SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a54fbd294b5cddef9497631c328c6a0f6ff5c151..7e19b9aa5ca86a6c3f02493a293b00c7adc98343 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -93,13 +93,13 @@ int32_t syncStart(int64_t rid) { goto _err; } - if (syncNodeStart(pSyncNode) < 0) { - sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, terrstr()); - goto _err; - } + if (syncNodeStart(pSyncNode) < 0) { + sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, terrstr()); + goto _err; + } - syncNodeRelease(pSyncNode); - return 0; + syncNodeRelease(pSyncNode); + return 0; _err: syncNodeRelease(pSyncNode); @@ -524,13 +524,8 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho pSnapshot->lastApplyTerm = pEntry->term; pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index); -<<<<<<< HEAD - syncEntryDestroy(pEntry); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); -======= syncEntryDestroy(pEntry); syncNodeRelease(pSyncNode); ->>>>>>> 3.0 return 0; } @@ -771,7 +766,7 @@ _out: int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { SSyncLogBuffer* pBuf = pNode->pLogBuf; - SRaftId destId = pMsg->srcId; + SRaftId destId = pMsg->srcId; ASSERT(pMgr->restored == false); char host[64]; uint16_t port; @@ -2508,10 +2503,10 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { // proceed match index, with replicating on needed SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths); - sDebug("vgId:%d, append raft entry. index: %" PRId64 ", term: %" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64 - ", %" PRId64 ")", - ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, - ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); + sInfo("vgId:%d, append raft entry. index: %" PRId64 ", term: %" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64 + ", %" PRId64 ")", + ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, + ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); // multi replica if (ths->replicaNum > 1) { @@ -2646,7 +2641,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncHeartbeatReply* pMsg = pRpcMsg->pCont; - SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId); + SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId); if (pMgr == NULL) { sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr); return -1; @@ -2797,17 +2792,6 @@ int32_t syncLogToAppendEntries(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTer return 0; } -#if 0 -void syncLogReplicateAppendEntries(SSyncNode* pNode, SyncAppendEntries* pMsg) { - for (int i = 0; i < pNode->replicaNum; i++) { - SRaftId* pDestId = &pNode->peersId[i]; - if (!syncUtilSameId(pDestId, &pNode->myRaftId)) { - (void)syncNodeSendAppendEntries(pNode, pDestId, pMsg); - } - } -} -#endif - // TLA+ Spec // ClientRequest(i, v) == // /\ state[i] = Leader @@ -2824,9 +2808,9 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn int32_t code = 0; - SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf); - SyncTerm term = ths->pRaftStore->currentTerm; - SSyncRaftEntry* pEntry = NULL; + SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf); + SyncTerm term = ths->pRaftStore->currentTerm; + SSyncRaftEntry* pEntry = NULL; if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { pEntry = syncEntryBuildFromClientRequest(pMsg->pCont, term, index); } else { @@ -2841,7 +2825,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn return syncNodeAppend(ths, pEntry); } - return 0; + return -1; } int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex) {