diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 525681e53e7f94a2453e756bc7969bd255a7c0a1..9db04ce69853587e232480e1dcb686aab132a65c 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -17,6 +17,7 @@ #include "syncUtil.h" #include "syncIndexMgr.h" #include "syncMessage.h" +#include "syncPipeline.h" #include "syncRaftCfg.h" #include "syncRaftStore.h" #include "syncSnapshot.h" @@ -163,45 +164,67 @@ bool syncUtilUserPreCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; } void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) { - int32_t len = snprintf(buf, bufLen, "{r-num:%d, my:%d, ", pCfg->replicaNum, pCfg->myIndex); - + int32_t len = snprintf(buf, bufLen, "{num:%d, idx:%d, [", pCfg->replicaNum, pCfg->myIndex); for (int32_t i = 0; i < pCfg->replicaNum; ++i) { + len += snprintf(buf + len, bufLen - len, "%s:%d", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort); if (i < pCfg->replicaNum - 1) { - len += snprintf(buf + len, bufLen - len, "%s:%d, ", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort); - } else { - len += snprintf(buf + len, bufLen - len, "%s:%d}", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort); + len += snprintf(buf + len, bufLen - len, "%s", ", "); } } + len += snprintf(buf + len, bufLen - len, "%s", "]}"); } // for leader static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { - int32_t len = 5; - + int32_t len = 0; + len += snprintf(buf + len, bufLen - len, "%s", "{"); for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i])); - + len += snprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs); if (i < pSyncNode->replicaNum - 1) { - len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 ",", i, tsMs); - } else { - len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 "}", i, tsMs); + len += snprintf(buf + len, bufLen - len, "%s", ","); } } + len += snprintf(buf + len, bufLen - len, "%s", "}"); } // for follower static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { - int32_t len = 4; - + int32_t len = 0; + len += snprintf(buf + len, bufLen - len, "%s", "{"); for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->replicasId[i])); - + len += snprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs); if (i < pSyncNode->replicaNum - 1) { - len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 ",", i, tsMs); - } else { - len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 "}", i, tsMs); + len += snprintf(buf + len, bufLen - len, "%s", ","); + } + } + len += snprintf(buf + len, bufLen - len, "%s", "}"); +} + +static void syncLogBufferStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { + SSyncLogBuffer* pBuf = pSyncNode->pLogBuf; + if (pBuf == NULL) { + return; + } + int len = 0; + len += snprintf(buf + len, bufLen - len, "[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pBuf->startIndex, + pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); +} + +static void syncLogReplMgrStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { + int len = 0; + len += snprintf(buf + len, bufLen - len, "%s", "{"); + for (int32_t i = 0; i < pSyncNode->replicaNum; i++) { + SSyncLogReplMgr* pMgr = pSyncNode->logReplMgrs[i]; + if (pMgr == NULL) break; + len += snprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 " %" PRId64 ", %" PRId64 ")", i, pMgr->restored, + pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex); + if (i + 1 < pSyncNode->replicaNum) { + len += snprintf(buf + len, bufLen - len, "%s", ", "); } } + len += snprintf(buf + len, bufLen - len, "%s", "}"); } static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { @@ -243,20 +266,23 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo int32_t cacheHit = pNode->pLogStore->cacheHit; int32_t cacheMiss = pNode->pLogStore->cacheMiss; - char cfgStr[1024]; + char cfgStr[1024] = ""; if (pNode->pRaftCfg != NULL) { syncCfg2SimpleStr(&(pNode->pRaftCfg->cfg), cfgStr, sizeof(cfgStr)); } else { return; } - char peerStr[1024] = "{"; - syncPeerState2Str(pNode, peerStr, sizeof(peerStr)); + char replMgrStatesStr[1024] = ""; + syncLogReplMgrStates2Str(pNode, replMgrStatesStr, sizeof(replMgrStatesStr)); + + char bufferStatesStr[256] = ""; + syncLogBufferStates2Str(pNode, bufferStatesStr, sizeof(bufferStatesStr)); - char hbrTimeStr[256] = "hbr:{"; + char hbrTimeStr[256] = ""; syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr)); - char hbTimeStr[256] = "hb:{"; + char hbTimeStr[256] = ""; syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr)); int32_t quorum = syncNodeDynamicQuorum(pNode); @@ -279,16 +305,16 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo taosPrintLog(flags, level, dflag, "vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", first-ver:%" PRId64 ", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64 - ", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hit:%d, mis:%d, hb-slow:%d, hbr-slow:%d, " + ", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, " "aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64 - ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64 ", %s, %s, %s, %s", + ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64 + ", buffer:%s, log-repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s", pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum, - pNode->becomeLeaderNum, pNode->configChangeNum, cacheHit, cacheMiss, pNode->hbSlowNum, - pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex, pNode->replicaNum, - pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, - pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr, hbTimeStr, - hbrTimeStr); + pNode->becomeLeaderNum, pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, + pNode->snapshottingIndex, pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, + pNode->restoreFinish, quorum, pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, + bufferStatesStr, replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr); } }