diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index bea51b2dd39c193aec3e4c607f09d3070e4a5434..1235e8eda6a7ed9fbdc6cf6ea1d71e1ff4ced535 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -713,8 +713,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { // delete confict entries code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); ASSERT(code == 0); - sDebug("vgId:%d sync event %s currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, - syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, delBegin, delEnd); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, + syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, delBegin, delEnd); logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); return code; @@ -995,8 +995,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs SyncIndex commitEnd = snapshot.lastApplyIndex; ths->commitIndex = snapshot.lastApplyIndex; - sDebug("vgId:%d sync event %s currentTerm:%lu commit by snapshot from index:%ld to index:%ld", ths->vgId, - syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, commitBegin, commitEnd); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld", + ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, + commitBegin, commitEnd); } SyncIndex beginIndex = ths->commitIndex + 1; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 94fdcc2772c66a354455632e00c83eac576cbf5e..f934f9a268d68635e0ca51bdeb49a6904856aa22 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -190,21 +190,21 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries if (gRaftDetailLog) { char* s = snapshotSender2Str(pSender); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d start sender first time, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, " "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu " "sender:%s", - ths->vgId, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, host, port, + ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, s); taosMemoryFree(s); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d start sender first time, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, " "lastApplyIndex:%ld " "lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu", - ths->vgId, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, host, port, + ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index e3431307fde21e7f40c65e1f5faebb9c1213fb29..d010728c78210c3c81d1b40ad4dac3adfd707f38 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -56,9 +56,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SyncIndex commitEnd = snapshot.lastApplyIndex; pSyncNode->commitIndex = snapshot.lastApplyIndex; - sDebug("vgId:%d sync event %s currentTerm:%lu commit by snapshot from index:%ld to index:%ld", pSyncNode->vgId, - syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, - snapshot.lastApplyIndex); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex); } // update commit index diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a209290fe47e2a74d5ad84529fd59548c24a9b6c..343a0e55977ea63e8657fcf487eba32982e14912 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -575,9 +575,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { return -1; } assert(rid == pSyncNode->rid); - sDebug("vgId:%d sync event %s currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, - syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), - pMsg->msgType); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, + TMSG_INFO(pMsg->msgType), pMsg->msgType); ret = syncNodePropose(pSyncNode, pMsg, isWeak); taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -586,9 +586,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { int32_t ret = 0; - sDebug("vgId:%d sync event %s currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, - syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), - pMsg->msgType); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, + TMSG_INFO(pMsg->msgType), pMsg->msgType); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { SRespStub stub; @@ -834,8 +834,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // snapshot meta // pSyncNode->sMeta.lastConfigIndex = -1; - sDebug("vgId:%d sync event %s currentTerm:%lu sync open", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), - pSyncNode->pRaftStore->currentTerm); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync open", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm); return pSyncNode; } @@ -882,8 +882,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { } void syncNodeClose(SSyncNode* pSyncNode) { - sDebug("vgId:%d sync event %s currentTerm:%lu sync close", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), - pSyncNode->pRaftStore->currentTerm); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync close", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm); int32_t ret; assert(pSyncNode != NULL); @@ -1322,9 +1322,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { oldSenders[i] = (pSyncNode->senders)[i]; - sDebug("vgId:%d sync event %s currentTerm:%lu save senders %d, %p, privateTerm:%lu", pSyncNode->vgId, - syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], - oldSenders[i]->privateTerm); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm); if (gRaftDetailLog) { ; } @@ -1376,9 +1376,12 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex char host[128]; uint16_t port; syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); - sDebug("vgId:%d sync event %s currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j], oldSenders[j]->privateTerm); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, " + "privateTerm:%lu", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j], + oldSenders[j]->privateTerm); (pSyncNode->senders)[i] = oldSenders[j]; oldSenders[j] = NULL; reset = true; @@ -1386,9 +1389,11 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex // reset replicaIndex int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex; (pSyncNode->senders)[i]->replicaIndex = i; - sDebug("vgId:%d sync event %s currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, " + "reset:%d", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset); } } } @@ -1397,9 +1402,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { if ((pSyncNode->senders)[i] == NULL) { (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); - sDebug("vgId:%d sync event %s currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm); } } @@ -1407,8 +1413,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { if (oldSenders[i] != NULL) { snapshotSenderDestroy(oldSenders[i]); - sDebug("vgId:%d sync event %s currentTerm:%lu delete old sender %p replicaIndex:%d", pSyncNode->vgId, - syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, oldSenders[i], i); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, oldSenders[i], i); oldSenders[i] = NULL; } } @@ -1479,10 +1486,11 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { sDebug( - "vgId:%d sync event %s currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, " "restoreFinish:%d, %s", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->restoreFinish, debugStr); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, + pSyncNode->restoreFinish, debugStr); // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -1519,9 +1527,12 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { // reset restoreFinish pSyncNode->restoreFinish = false; - sDebug("vgId:%d sync event %s currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, restoreFinish:%d, %s", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->restoreFinish, debugStr); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, " + "restoreFinish:%d, %s", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, + pSyncNode->restoreFinish, debugStr); // state change pSyncNode->state = TAOS_SYNC_STATE_LEADER; @@ -2095,14 +2106,15 @@ const char* syncStr(ESyncState state) { static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); - sDebug("vgId:%d sync event %s currentTerm:%lu begin leader transfer", ths->vgId, syncUtilState2String(ths->state), - ths->pRaftStore->currentTerm); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer", ths->vgId, + syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm); if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) { - sDebug("vgId:%d sync event %s currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId, - syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, - pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId, + syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, + pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort, + pSyncLeaderTransfer->newLeaderId.addr); // reset elect timer now! int32_t electMS = 1; @@ -2240,9 +2252,10 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t code = 0; ESyncState state = flag; - sDebug("vgId:%d sync event %s currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", - ths->vgId, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, beginIndex, endIndex, - syncUtilState2String(state)); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 + ", %s", + ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, beginIndex, + endIndex, syncUtilState2String(state)); // execute fsm if (ths->pFsm != NULL) { @@ -2290,9 +2303,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ths->pFsm->FpRestoreFinishCb(ths->pFsm); } ths->restoreFinish = true; - sDebug("vgId:%d sync event %s currentTerm:%lu restore finish, %s, index:%ld", ths->vgId, - syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, syncUtilState2String(ths->state), - pEntry->index); + sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld", ths->vgId, + syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, + syncUtilState2String(ths->state), pEntry->index); } } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 308edc0b93b4bd0aba5e1cae1886d453a8400f6e..7c6d97a96e7d61be579f50ae443524c6b464c047 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -163,10 +163,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr walFsync(pWal, true); - sDebug("vgId:%d sync event %s currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d", - pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), - pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, - TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, " + "originalRpcType:%s,%d", + pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex, + pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, + TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); return code; } @@ -323,11 +325,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { walFsync(pWal, true); sDebug( - "vgId:%d sync event %s currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, " "originalRpcType:%s,%d", - pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftStore->currentTerm, - pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType), pEntry->msgType, - TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); + pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex, + pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, + TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); return code; } diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index cc4004a6bd66c2d5ae08e1105642ead859e97913..551c2611b06cc8a93cb5bad09b23ccbd5687fb44 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -46,10 +46,11 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) { taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event %s currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pStub->rpcMsg.info.handle, - pStub->rpcMsg.info.ahandle); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, + pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); taosThreadMutexUnlock(&(pObj->mutex)); return keyCode; @@ -72,10 +73,12 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) { memcpy(pStub, pTmp, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event %s currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle, - pStub->rpcMsg.info.ahandle); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p " + "ahandle:%p", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, + pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); taosThreadMutexUnlock(&(pObj->mutex)); return 1; // get one object @@ -92,10 +95,12 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu memcpy(pStub, pTmp, sizeof(SRespStub)); SSyncNode *pSyncNode = pObj->data; - sDebug("vgId:%d sync event %s currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle, - pStub->rpcMsg.info.ahandle); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p " + "ahandle:%p", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, + pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); taosHashRemove(pObj->pRespHash, &index, sizeof(index)); taosThreadMutexUnlock(&(pObj->mutex)); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 68336499bfda5a7d1b0a66d84feeb51ff4ce2dda..ef2011cc2620f44e8caf149b9488f14b5bad5bca 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -141,21 +141,23 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d " + "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu send " "msg:%s", - pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d " + "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", - pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); @@ -285,31 +287,34 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d " + "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu send " "msg:%s", - pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d " + "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", - pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); } } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d " + "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", - pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); @@ -344,16 +349,19 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d " + "privateTerm:%lu send " "msg:%s", - pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm, msgStr); taosMemoryFree(msgStr); } else { - sDebug("vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu", - pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), - pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm); + sDebug( + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d " + "privateTerm:%lu", + pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, + pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm); } syncSnapshotSendDestroy(pMsg); @@ -586,19 +594,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, " + "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, - pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, " + "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld privateTerm:%lu", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, - pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex, pReceiver->privateTerm); } } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { @@ -636,18 +648,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { bool isDrop; if (IamInNew) { sDebug( - "vgId:%d sync event %s currentTerm:%lu update config by snapshot, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, " + "lastTerm:%lu, " "lastConfigIndex:%ld ", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu do not update config by snapshot, I am not in newCfg, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in " + "newCfg, " "lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld ", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); } // change isStandBy to normal @@ -680,21 +694,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin " + "index:%ld, " "snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, - pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, - pReceiver->privateTerm, logSimpleStr); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, + snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm, logSimpleStr); taosMemoryFree(logSimpleStr); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin " + "index:%ld, " "snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, - pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, - pReceiver->privateTerm); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, + snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm); } pReceiver->pWriter = NULL; @@ -705,17 +721,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, " + "lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", - pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), + pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, " + "lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", - pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), + pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); } @@ -732,20 +750,22 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, " + "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv " "msg:%s", - pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), + pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, " + "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", - pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), + pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); } @@ -767,19 +787,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, " + "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, - pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, " + "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, " + "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, - pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, + pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, + pMsg->lastConfigIndex, pReceiver->privateTerm); } } else {