From 15ac89590c215eb9568b3141a6c80f1a699bb3a6 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 16 Jun 2022 17:41:01 +0800 Subject: [PATCH] refactor(sync): add trace log --- source/libs/sync/src/syncAppendEntries.c | 8 +- source/libs/sync/src/syncAppendEntriesReply.c | 16 ++- source/libs/sync/src/syncCommit.c | 6 +- source/libs/sync/src/syncMain.c | 72 ++++++----- source/libs/sync/src/syncRaftLog.c | 41 +++--- source/libs/sync/src/syncRespMgr.c | 21 +-- source/libs/sync/src/syncSnapshot.c | 120 ++++++++++-------- source/libs/sync/src/syncUtil.c | 20 ++- 8 files changed, 174 insertions(+), 130 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 89b4761212..bea51b2dd3 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -713,8 +713,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { // delete confict entries code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); ASSERT(code == 0); - sDebug("vgId:%d sync event currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, ths->pRaftStore->currentTerm, - delBegin, delEnd); + sDebug("vgId:%d sync event %s currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, + syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, delBegin, delEnd); logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); return code; @@ -995,8 +995,8 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs SyncIndex commitEnd = snapshot.lastApplyIndex; ths->commitIndex = snapshot.lastApplyIndex; - sDebug("vgId:%d sync event currentTerm:%lu commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, - ths->pRaftStore->currentTerm, commitBegin, commitEnd, syncUtilState2String(ths->state)); + sDebug("vgId:%d sync event %s currentTerm:%lu commit by snapshot from index:%ld to index:%ld", ths->vgId, + syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, commitBegin, commitEnd); } SyncIndex beginIndex = ths->commitIndex + 1; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 5290d7d28e..94fdcc2772 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -190,19 +190,23 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries if (gRaftDetailLog) { char* s = snapshotSender2Str(pSender); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d start sender first time, lastApplyIndex:%ld " + "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d start sender first time, " + "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu " "sender:%s", - ths->vgId, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, s); + ths->vgId, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, host, port, + pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, + pSender->privateTerm, s); taosMemoryFree(s); } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d start sender first time, lastApplyIndex:%ld " + "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d start sender first time, " + "lastApplyIndex:%ld " "lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu", - ths->vgId, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex, - pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); + ths->vgId, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, host, port, + pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, + pSender->privateTerm); } } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index efdb5b24ce..e3431307fd 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -56,9 +56,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SyncIndex commitEnd = snapshot.lastApplyIndex; pSyncNode->commitIndex = snapshot.lastApplyIndex; - sDebug("vgId:%d sync event currentTerm:%lu commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId, - pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex, - syncUtilState2String(pSyncNode->state)); + sDebug("vgId:%d sync event %s currentTerm:%lu commit by snapshot from index:%ld to index:%ld", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, + snapshot.lastApplyIndex); } // update commit index diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index cc2c82fa5f..57481ee3a5 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -573,8 +573,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { return -1; } assert(rid == pSyncNode->rid); - sDebug("vgId:%d sync event currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, - pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType); + sDebug("vgId:%d sync event %s currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), + pMsg->msgType); ret = syncNodePropose(pSyncNode, pMsg, isWeak); taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -583,8 +584,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { int32_t ret = 0; - sDebug("vgId:%d sync event currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, - pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType); + sDebug("vgId:%d sync event %s currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), + pMsg->msgType); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { SRespStub stub; @@ -830,7 +832,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // snapshot meta // pSyncNode->sMeta.lastConfigIndex = -1; - sDebug("vgId:%d sync event currentTerm:%lu sync open", pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm); + sDebug("vgId:%d sync event %s currentTerm:%lu sync open", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), + pSyncNode->pRaftStore->currentTerm); return pSyncNode; } @@ -877,7 +880,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { } void syncNodeClose(SSyncNode* pSyncNode) { - sDebug("vgId:%d sync event currentTerm:%lu sync close", pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm); + sDebug("vgId:%d sync event %s currentTerm:%lu sync close", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), + pSyncNode->pRaftStore->currentTerm); int32_t ret; assert(pSyncNode != NULL); @@ -1316,8 +1320,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { oldSenders[i] = (pSyncNode->senders)[i]; - sDebug("vgId:%d sync event currentTerm:%lu save senders %d, %p, privateTerm:%lu", pSyncNode->vgId, - pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm); + sDebug("vgId:%d sync event %s currentTerm:%lu save senders %d, %p, privateTerm:%lu", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], + oldSenders[i]->privateTerm); if (gRaftDetailLog) { ; } @@ -1369,9 +1374,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex char host[128]; uint16_t port; syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); - sDebug("vgId:%d sync event currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port, - oldSenders[j], oldSenders[j]->privateTerm); + sDebug("vgId:%d sync event %s currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j], oldSenders[j]->privateTerm); (pSyncNode->senders)[i] = oldSenders[j]; oldSenders[j] = NULL; reset = true; @@ -1379,9 +1384,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex // reset replicaIndex int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex; (pSyncNode->senders)[i]->replicaIndex = i; - sDebug("vgId:%d sync event currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port, - (pSyncNode->senders)[i], reset); + sDebug("vgId:%d sync event %s currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset); } } } @@ -1390,9 +1395,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { if ((pSyncNode->senders)[i] == NULL) { (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); - sDebug("vgId:%d sync event currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i, - (pSyncNode->senders)[i]->privateTerm); + sDebug("vgId:%d sync event %s currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm); } } @@ -1400,8 +1405,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { if (oldSenders[i] != NULL) { snapshotSenderDestroy(oldSenders[i]); - sDebug("vgId:%d sync event currentTerm:%lu delete old sender %p replicaIndex:%d", pSyncNode->vgId, - pSyncNode->pRaftStore->currentTerm, oldSenders[i], i); + sDebug("vgId:%d sync event %s currentTerm:%lu delete old sender %p replicaIndex:%d", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, oldSenders[i], i); oldSenders[i] = NULL; } } @@ -1472,10 +1477,10 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { sDebug( - "vgId:%d sync event currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, " + "vgId:%d sync event %s currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, " "restoreFinish:%d, %s", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, - pSyncNode->restoreFinish, debugStr); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->restoreFinish, debugStr); // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -1512,9 +1517,9 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { // reset restoreFinish pSyncNode->restoreFinish = false; - sDebug("vgId:%d sync event currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, restoreFinish:%d, %s", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, - pSyncNode->restoreFinish, debugStr); + sDebug("vgId:%d sync event %s currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, restoreFinish:%d, %s", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->restoreFinish, debugStr); // state change pSyncNode->state = TAOS_SYNC_STATE_LEADER; @@ -2088,12 +2093,13 @@ const char* syncStr(ESyncState state) { static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); - sDebug("vgId:%d sync event currentTerm:%lu begin leader transfer", ths->vgId, ths->pRaftStore->currentTerm); + sDebug("vgId:%d sync event %s currentTerm:%lu begin leader transfer", ths->vgId, syncUtilState2String(ths->state), + ths->pRaftStore->currentTerm); if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) { - sDebug("vgId:%d sync event currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId, - ths->pRaftStore->currentTerm, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, + sDebug("vgId:%d sync event %s currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId, + syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr); // reset elect timer now! @@ -2216,8 +2222,9 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t code = 0; ESyncState state = flag; - sDebug("vgId:%d sync event currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, - ths->pRaftStore->currentTerm, beginIndex, endIndex, syncUtilState2String(state)); + sDebug("vgId:%d sync event %s currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", + ths->vgId, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, beginIndex, endIndex, + syncUtilState2String(state)); // execute fsm if (ths->pFsm != NULL) { @@ -2265,8 +2272,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ths->pFsm->FpRestoreFinishCb(ths->pFsm); } ths->restoreFinish = true; - sDebug("vgId:%d sync event currentTerm:%lu restore finish, %s, index:%ld", ths->vgId, - ths->pRaftStore->currentTerm, syncUtilState2String(ths->state), pEntry->index); + sDebug("vgId:%d sync event %s currentTerm:%lu restore finish, %s, index:%ld", ths->vgId, + syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, syncUtilState2String(ths->state), + pEntry->index); } } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 31ac44fa81..308edc0b93 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -163,9 +163,9 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr walFsync(pWal, true); - sDebug("vgId:%d sync event currentTerm:%lu write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d", - pData->pSyncNode->vgId, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, - syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, + sDebug("vgId:%d sync event %s currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d", + pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), + pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); return code; @@ -323,10 +323,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { walFsync(pWal, true); sDebug( - "vgId:%d sync event currentTerm:%lu old write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d", - pData->pSyncNode->vgId, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, - syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType), - pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); + "vgId:%d sync event %s currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, " + "originalRpcType:%s,%d", + pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftStore->currentTerm, + pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType), pEntry->msgType, + TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); return code; } @@ -407,18 +408,20 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { } int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { - SSyncLogStoreData* pData = pLogStore->data; - SWal* pWal = pData->pWal; - // assert(walCommit(pWal, index) == 0); - int32_t code = walCommit(pWal, index); - if (code != 0) { - int32_t err = terrno; - const char* errStr = tstrerror(err); - int32_t linuxErr = errno; - const char* linuxErrMsg = strerror(errno); - sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); - ASSERT(0); - } + /* + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + // assert(walCommit(pWal, index) == 0); + int32_t code = walCommit(pWal, index); + if (code != 0) { + int32_t err = terrno; + const char* errStr = tstrerror(err); + int32_t linuxErr = errno; + const char* linuxErrMsg = strerror(errno); + sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, + linuxErrMsg); ASSERT(0); + } + */ return 0; } diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 5b127793d6..cc4004a6bd 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -46,9 +46,10 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) { taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId, - pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, - pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); + sDebug("vgId:%d sync event %s currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pStub->rpcMsg.info.handle, + pStub->rpcMsg.info.ahandle); taosThreadMutexUnlock(&(pObj->mutex)); return keyCode; @@ -71,9 +72,10 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) { memcpy(pStub, pTmp, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, - index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); + sDebug("vgId:%d sync event %s currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle, + pStub->rpcMsg.info.ahandle); taosThreadMutexUnlock(&(pObj->mutex)); return 1; // get one object @@ -90,9 +92,10 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu memcpy(pStub, pTmp, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, - index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); + sDebug("vgId:%d sync event %s currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle, + pStub->rpcMsg.info.ahandle); taosHashRemove(pObj->pRespHash, &index, sizeof(index)); taosThreadMutexUnlock(&(pObj->mutex)); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index d3785060d4..219a1a5a7d 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -141,20 +141,22 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu send " "msg:%s", - pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", - pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); } @@ -283,29 +285,32 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu send " "msg:%s", - pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", - pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); } } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", - pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); } @@ -339,14 +344,16 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send msg:%s", - pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, - pSender->privateTerm, msgStr); + "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send " + "msg:%s", + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm, + msgStr); taosMemoryFree(msgStr); } else { - sDebug("vgId:%d sync event currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu", - pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, - pSender->ack, pSender->privateTerm); + sDebug("vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu", + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm); } syncSnapshotSendDestroy(pMsg); @@ -579,17 +586,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, " + "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, - pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, + pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, " + "lastTerm:%lu, " "lastConfigIndex:%ld privateTerm:%lu", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, - pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, + pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); } } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { @@ -626,18 +635,18 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { bool isDrop; if (IamInNew) { sDebug( - "vgId:%d sync event currentTerm:%lu update config by snapshot, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event %s currentTerm:%lu update config by snapshot, lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld ", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, - pMsg->lastConfigIndex); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop); } else { sDebug( - "vgId:%d sync event currentTerm:%lu do not update config by snapshot, I am not in newCfg, " + "vgId:%d sync event %s currentTerm:%lu do not update config by snapshot, I am not in newCfg, " "lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld ", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, - pMsg->lastConfigIndex); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); } // change isStandBy to normal @@ -662,20 +671,21 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, " + "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, " "snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, - snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm, - logSimpleStr); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, + pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, + pReceiver->privateTerm, logSimpleStr); taosMemoryFree(logSimpleStr); } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, " + "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, " "snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, - snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, + pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, + pReceiver->privateTerm); } pReceiver->pWriter = NULL; @@ -686,17 +696,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", - pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, - pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); + pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), + pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, + pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", - pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, - pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); + pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), + pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, + pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); } } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { @@ -711,20 +723,22 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv " "msg:%s", - pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, - pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); + pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), + pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, + pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", - pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, - pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); + pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), + pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, + pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); } } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { @@ -744,19 +758,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, - pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, + pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", - pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, - pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, + pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); } } else { diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 4026596548..cbc1298113 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -168,14 +168,26 @@ char* syncUtilRaftId2Str(const SRaftId* p) { } const char* syncUtilState2String(ESyncState state) { + /* + if (state == TAOS_SYNC_STATE_FOLLOWER) { + return "TAOS_SYNC_STATE_FOLLOWER"; + } else if (state == TAOS_SYNC_STATE_CANDIDATE) { + return "TAOS_SYNC_STATE_CANDIDATE"; + } else if (state == TAOS_SYNC_STATE_LEADER) { + return "TAOS_SYNC_STATE_LEADER"; + } else { + return "TAOS_SYNC_STATE_UNKNOWN"; + } + */ + if (state == TAOS_SYNC_STATE_FOLLOWER) { - return "TAOS_SYNC_STATE_FOLLOWER"; + return "follower"; } else if (state == TAOS_SYNC_STATE_CANDIDATE) { - return "TAOS_SYNC_STATE_CANDIDATE"; + return "candidate"; } else if (state == TAOS_SYNC_STATE_LEADER) { - return "TAOS_SYNC_STATE_LEADER"; + return "leader"; } else { - return "TAOS_SYNC_STATE_UNKNOWN"; + return "state_error"; } } -- GitLab