diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 74a73f6b10f08f3571cbdfc9c0339b93aa161ab9..a27be95049d64b8ff68bc2f75fd62e49f32a47cd 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -139,6 +139,7 @@ typedef struct SSyncFSM { void (*FpReConfigCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SReConfigCbMeta* pMeta); void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm); + int32_t (*FpApplyQueueItems)(const struct SSyncFSM* pFsm); void (*FpBecomeLeaderCb)(const struct SSyncFSM* pFsm); void (*FpBecomeFollowerCb)(const struct SSyncFSM* pFsm); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index ebab83cf6dc0620f2d59a1c84a9759d9e3d16e48..450d7a30ed1f0863c4bfafcd0725a5151ff00146 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -205,8 +205,23 @@ static void mndBecomeLeader(const SSyncFSM *pFsm) { static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) { SMnode *pMnode = pFsm->data; - int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE); - return (itemSize == 0); + if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) { + int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE); + return (itemSize == 0); + } else { + return true; + } +} + +static int32_t mndApplyQueueItems(const SSyncFSM *pFsm) { + SMnode *pMnode = pFsm->data; + + if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) { + int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE); + return itemSize; + } else { + return -1; + } } SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { @@ -218,6 +233,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { pFsm->FpRestoreFinishCb = mndRestoreFinish; pFsm->FpLeaderTransferCb = NULL; pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty; + pFsm->FpApplyQueueItems = mndApplyQueueItems; pFsm->FpReConfigCb = NULL; pFsm->FpBecomeLeaderCb = mndBecomeLeader; pFsm->FpBecomeFollowerCb = mndBecomeFollower; @@ -291,7 +307,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; if (req.contLen <= 0) return -1; - + req.pCont = rpcMallocCont(req.contLen); if (req.pCont == NULL) return -1; memcpy(req.pCont, pRaw, req.contLen); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index e7f8c9f562bbc3b193bc9236f3f0a2243a2d7fce..38cb534d7f3e7f8343893db38b286b2da2eabdae 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -438,8 +438,24 @@ static void vnodeBecomeLeader(const SSyncFSM *pFsm) { static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; - int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE); - return (itemSize == 0); + + if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) { + int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE); + return (itemSize == 0); + } else { + return true; + } +} + +static int32_t vnodeApplyQueueItems(const SSyncFSM *pFsm) { + SVnode *pVnode = pFsm->data; + + if (pVnode != NULL && pVnode->msgCb.qsizeFp != NULL) { + int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE); + return itemSize; + } else { + return -1; + } } static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { @@ -452,6 +468,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpRestoreFinishCb = vnodeRestoreFinish; pFsm->FpLeaderTransferCb = NULL; pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty; + pFsm->FpApplyQueueItems = vnodeApplyQueueItems; pFsm->FpBecomeLeaderCb = vnodeBecomeLeader; pFsm->FpBecomeFollowerCb = vnodeBecomeFollower; pFsm->FpReConfigCb = NULL; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 4f3f1c2c00681ecab8b24c515198f3c37166c52a..792ce67cd4c9be0eef09fe6b11ba0857ffb92f1d 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -246,7 +246,8 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { } else { // error char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "ignore, get local entry error, append-index:%" PRId64, appendIndex); + snprintf(logBuf, sizeof(logBuf), "ignore, get local entry error, append-index:%" PRId64 " err:%d", appendIndex, + terrno); syncLogRecvAppendEntries(ths, pMsg, logBuf); syncEntryDestory(pLocalEntry); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b4ca3f5b0166c8612c106ca2c2f0028bb317094e..15577151bae528120f81b591b54730d4c2d27540 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1388,7 +1388,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde char host[128]; uint16_t port; syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "snapshot sender reset for: %" PRIu64 ", newIndex:%d, %s:%d, %p", + sNTrace(pSyncNode, "snapshot sender reset for: %" PRId64 ", newIndex:%d, %s:%d, %p", (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]); (pSyncNode->senders)[i] = oldSenders[j]; @@ -1457,7 +1457,7 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { if (term > pSyncNode->pRaftStore->currentTerm) { raftStoreSetTerm(pSyncNode->pRaftStore, term); char tmpBuf[64]; - snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRIu64, term); + snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term); syncNodeBecomeFollower(pSyncNode, tmpBuf); raftStoreClearVote(pSyncNode->pRaftStore); } @@ -1471,20 +1471,20 @@ void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { if (pSyncNode->pRaftStore->currentTerm > newTerm) { - sNTrace(pSyncNode, "step down, ignore, new-term:%" PRIu64 ", current-term:%" PRIu64, newTerm, + sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, pSyncNode->pRaftStore->currentTerm); return; } do { - sNTrace(pSyncNode, "step down, new-term:%" PRIu64 ", current-term:%" PRIu64, newTerm, + sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, pSyncNode->pRaftStore->currentTerm); } while (0); if (pSyncNode->pRaftStore->currentTerm < newTerm) { raftStoreSetTerm(pSyncNode->pRaftStore, newTerm); char tmpBuf[64]; - snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRIu64, newTerm); + snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm); syncNodeBecomeFollower(pSyncNode, tmpBuf); raftStoreClearVote(pSyncNode->pRaftStore); @@ -1803,7 +1803,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { } } - sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRIu64, + sNError(pSyncNode, "sync node get pre term error, index:%" PRId64 ", snap-index:%" PRId64 ", snap-term:%" PRId64, index, snapshot.lastApplyIndex, snapshot.lastApplyTerm); return SYNC_TERM_INVALID; } @@ -1844,7 +1844,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { } } else { - sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRIu64 ", pingTimerLogicClockUser:%" PRIu64, + sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRId64 ", pingTimerLogicClockUser:%" PRId64, pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); } } @@ -1866,7 +1866,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { taosMemoryFree(pElectTimer); return; } - sNTrace(pSyncNode, "eq elect timer lc:%" PRIu64, pSyncMsg->logicClock); + sNTrace(pSyncNode, "eq elect timer lc:%" PRId64, pSyncMsg->logicClock); } else { sTrace("syncNodeEqElectTimer syncEqMsg is NULL"); } @@ -1919,7 +1919,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { sError("sync env is stop, syncNodeEqHeartbeatTimer"); } } else { - sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64 + sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64 "", pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); } @@ -1981,7 +1981,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { } } else { - sTrace("==syncNodeEqPeerHeartbeatTimer== timerLogicClock:%" PRIu64 ", msgLogicClock:%" PRIu64 "", timerLogicClock, + sTrace("==syncNodeEqPeerHeartbeatTimer== timerLogicClock:%" PRId64 ", msgLogicClock:%" PRId64 "", timerLogicClock, msgLogicClock); } } @@ -2111,7 +2111,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code); rpcFreeCont(rpcMsgLocalCmd.pCont); } else { - sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index: %" PRIu64, ths->vgId, pSyncMsg->fcIndex); + sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index:%" PRId64, ths->vgId, pSyncMsg->fcIndex); } } } @@ -2132,7 +2132,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code); rpcFreeCont(rpcMsgLocalCmd.pCont); } else { - sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRIu64, ths->vgId, pSyncMsg->sdNewTerm); + sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm); } } @@ -2303,7 +2303,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p } if (pEntry->term < ths->pRaftStore->currentTerm) { - sNTrace(ths, "little term:%" PRIu64 ", can not do leader transfer", pEntry->term); + sNTrace(ths, "little term:%" PRId64 ", can not do leader transfer", pEntry->term); return 0; } @@ -2333,7 +2333,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p int32_t ret = syncNodeRestartElectTimer(ths, electMS); ASSERT(ret == 0); - sNTrace(ths, "maybe leader transfer to %s:%d %" PRIu64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, + sNTrace(ths, "maybe leader transfer to %s:%d %" PRId64, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr); } @@ -2581,7 +2581,7 @@ const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) { } void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) { - sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRIu64 ", ms:%d, data:%p}, %s", + sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s", syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s); } @@ -2589,7 +2589,7 @@ void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, c char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "send sync-request-vote to %s:%d {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "}, %s", + sNTrace(pSyncNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); } @@ -2598,7 +2598,7 @@ void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, c char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "}, %s", + sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); } @@ -2606,7 +2606,7 @@ void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRIu64 ", grant:%d}, %s", host, port, pMsg->term, + sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term, pMsg->voteGranted, s); } @@ -2614,7 +2614,7 @@ void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, %s", host, port, pMsg->term, + sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term, pMsg->voteGranted, s); } @@ -2623,8 +2623,8 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 - ", pterm:%" PRIu64 ", cmt:%" PRId64 ", datalen:%d}, %s", + "send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 + ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s", host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, pMsg->dataLen, s); } @@ -2635,8 +2635,8 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "recv sync-append-entries from %s:%d {term:%" PRIu64 ", pre-index:%" PRIu64 ", pre-term:%" PRIu64 - ", cmt:%" PRIu64 ", pterm:%" PRIu64 ", datalen:%d}, %s", + "recv sync-append-entries from %s:%d {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 + ", cmt:%" PRId64 ", pterm:%" PRId64 ", datalen:%d}, %s", host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, pMsg->dataLen, s); } @@ -2647,8 +2647,8 @@ void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 - ", pterm:%" PRIu64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s", + "send sync-append-entries-batch to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 + ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s", host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount, s); } @@ -2659,8 +2659,8 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "recv sync-append-entries-batch from %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 - ", pterm:%" PRIu64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s", + "recv sync-append-entries-batch from %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 + ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s", host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount, s); } @@ -2671,7 +2671,7 @@ void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRId64 + "send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64 "}, %s", host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); } @@ -2682,7 +2682,7 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "recv sync-append-entries-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRId64 + "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64 "}, %s", host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); } @@ -2693,7 +2693,7 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "send sync-heartbeat to %s:%d {term:%" PRIu64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRIu64 + "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s); } @@ -2704,7 +2704,7 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "recv sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRIu64 + "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s); } @@ -2714,7 +2714,7 @@ void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port, + sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port, pMsg->term, pMsg->privateTerm, s); } @@ -2722,34 +2722,34 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port, + sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port, pMsg->term, pMsg->privateTerm, s); } void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) { - sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRIu64 "}, %s", pMsg->cmd, - syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, s); + sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd, + syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s); } void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "send sync-pre-snapshot to %s:%d {term:%" PRIu64 "}, %s", host, port, pMsg->term, s); + sNTrace(pSyncNode, "send sync-pre-snapshot to %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s); } void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-pre-snapshot from %s:%d {term:%" PRIu64 "}, %s", host, port, pMsg->term, s); + sNTrace(pSyncNode, "recv sync-pre-snapshot from %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s); } void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "send sync-pre-snapshot-reply to %s:%d {term:%" PRIu64 ", snap-start:%" PRId64 "}, %s", host, port, + sNTrace(pSyncNode, "send sync-pre-snapshot-reply to %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, port, pMsg->term, pMsg->snapStart, s); } @@ -2757,7 +2757,7 @@ void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-pre-snapshot-reply from %s:%d {term:%" PRIu64 ", snap-start:%" PRId64 "}, %s", host, + sNTrace(pSyncNode, "recv sync-pre-snapshot-reply from %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, port, pMsg->term, pMsg->snapStart, s); } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index e82578ef4d2a6f4e8370e97abe15cc9f0c6ba1be..1a00b0f5a43fda87d3e250c8a02bc00cc71bc498 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -205,7 +205,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { if (i < pSyncNode->replicaNum - 1) { len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 ", ", i, pState->lastSendIndex, - pState->lastSendTime); + pState->lastSendTime); } else { len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 "}", i, pState->lastSendIndex, pState->lastSendTime); @@ -216,6 +216,9 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) { if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; + // save error code, otherwise it will be overwritten + int32_t errCode = terrno; + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) { pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); @@ -242,16 +245,21 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer); va_end(argpointer); + int32_t aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm); + + // restore error code + terrno = errCode; + taosPrintLog(flags, level, dflag, "vgId:%d, sync %s " "%s" ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 - ", snap-tm:%" PRIu64 ", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%" PRId64 + ", snap-tm:%" PRIu64 ", sby:%d, aq:%d, bch:%d, r-num:%d, lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", pNode->vgId, syncStr(pNode->state), eventLog, pNode->pRaftStore->currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, - pNode->pRaftCfg->isStandBy, pNode->pRaftCfg->snapshotStrategy, pNode->pRaftCfg->batchSize, - pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, + pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum, + pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); } diff --git a/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c index 99d124580bf46351e5d396e17a879d781e77c049..5e1a9be164745aa26f1ad0a483dc0c4deca02af1 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMainDebug.c @@ -104,37 +104,37 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer); cJSON_AddStringToObject(pRoot, "pPingTimer", u64buf); cJSON_AddNumberToObject(pRoot, "pingTimerMS", pSyncNode->pingTimerMS); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->pingTimerLogicClock); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pSyncNode->pingTimerLogicClock); cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->pingTimerLogicClockUser); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pSyncNode->pingTimerLogicClockUser); cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB); cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->pingTimerCounter); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pSyncNode->pingTimerCounter); cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf); // elect timer snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pElectTimer); cJSON_AddStringToObject(pRoot, "pElectTimer", u64buf); cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerLogicClock); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pSyncNode->electTimerLogicClock); cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB); cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->electTimerCounter); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pSyncNode->electTimerCounter); cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf); // heartbeat timer snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pHeartbeatTimer); cJSON_AddStringToObject(pRoot, "pHeartbeatTimer", u64buf); cJSON_AddNumberToObject(pRoot, "heartbeatTimerMS", pSyncNode->heartbeatTimerMS); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->heartbeatTimerLogicClock); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pSyncNode->heartbeatTimerLogicClock); cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->heartbeatTimerLogicClockUser); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pSyncNode->heartbeatTimerLogicClockUser); cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB); cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSyncNode->heartbeatTimerCounter); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pSyncNode->heartbeatTimerCounter); cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf); // callback @@ -195,7 +195,7 @@ inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); snprintf(s, len, - "vgId:%d, sync %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64 + "vgId:%d, sync %s, tm:%" PRId64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64 ", sby:%d, " "r-num:%d, " "lcfg:%" PRId64 ", chging:%d, rsto:%d",