提交 dd024941 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/feature/3.0_mhli' into fix/dnode

...@@ -713,8 +713,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -713,8 +713,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
// delete confict entries // delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0); ASSERT(code == 0);
sDebug("vgId:%d sync event %s currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld", ths->vgId,
syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, delBegin, delEnd); syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, delBegin, delEnd);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
return code; return code;
...@@ -995,8 +995,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -995,8 +995,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
SyncIndex commitEnd = snapshot.lastApplyIndex; SyncIndex commitEnd = snapshot.lastApplyIndex;
ths->commitIndex = snapshot.lastApplyIndex; ths->commitIndex = snapshot.lastApplyIndex;
sDebug("vgId:%d sync event %s currentTerm:%lu commit by snapshot from index:%ld to index:%ld", ths->vgId, sDebug(
syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, commitBegin, commitEnd); "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld",
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
commitBegin, commitEnd);
} }
SyncIndex beginIndex = ths->commitIndex + 1; SyncIndex beginIndex = ths->commitIndex + 1;
......
...@@ -190,21 +190,21 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -190,21 +190,21 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if (gRaftDetailLog) { if (gRaftDetailLog) {
char* s = snapshotSender2Str(pSender); char* s = snapshotSender2Str(pSender);
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d start sender first time, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld " "lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu "
"sender:%s", "sender:%s",
ths->vgId, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, host, port, ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm, s); pSender->privateTerm, s);
taosMemoryFree(s); taosMemoryFree(s);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d start sender first time, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld " "lastApplyIndex:%ld "
"lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu", "lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu",
ths->vgId, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, host, port, ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm); pSender->privateTerm);
} }
......
...@@ -56,9 +56,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -56,9 +56,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex commitEnd = snapshot.lastApplyIndex; SyncIndex commitEnd = snapshot.lastApplyIndex;
pSyncNode->commitIndex = snapshot.lastApplyIndex; pSyncNode->commitIndex = snapshot.lastApplyIndex;
sDebug("vgId:%d sync event %s currentTerm:%lu commit by snapshot from index:%ld to index:%ld", pSyncNode->vgId, sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld",
syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
snapshot.lastApplyIndex); pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex);
} }
// update commit index // update commit index
......
...@@ -575,9 +575,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ...@@ -575,9 +575,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
return -1; return -1;
} }
assert(rid == pSyncNode->rid); assert(rid == pSyncNode->rid);
sDebug("vgId:%d sync event %s currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm,
pMsg->msgType); TMSG_INFO(pMsg->msgType), pMsg->msgType);
ret = syncNodePropose(pSyncNode, pMsg, isWeak); ret = syncNodePropose(pSyncNode, pMsg, isWeak);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
...@@ -586,9 +586,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ...@@ -586,9 +586,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0; int32_t ret = 0;
sDebug("vgId:%d sync event %s currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm,
pMsg->msgType); TMSG_INFO(pMsg->msgType), pMsg->msgType);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SRespStub stub; SRespStub stub;
...@@ -834,8 +834,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -834,8 +834,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// snapshot meta // snapshot meta
// pSyncNode->sMeta.lastConfigIndex = -1; // pSyncNode->sMeta.lastConfigIndex = -1;
sDebug("vgId:%d sync event %s currentTerm:%lu sync open", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync open", pSyncNode->vgId,
pSyncNode->pRaftStore->currentTerm); syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm);
return pSyncNode; return pSyncNode;
} }
...@@ -882,8 +882,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { ...@@ -882,8 +882,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
} }
void syncNodeClose(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) {
sDebug("vgId:%d sync event %s currentTerm:%lu sync close", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync close", pSyncNode->vgId,
pSyncNode->pRaftStore->currentTerm); syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm);
int32_t ret; int32_t ret;
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
...@@ -1322,9 +1322,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex ...@@ -1322,9 +1322,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
oldSenders[i] = (pSyncNode->senders)[i]; oldSenders[i] = (pSyncNode->senders)[i];
sDebug("vgId:%d sync event %s currentTerm:%lu save senders %d, %p, privateTerm:%lu", pSyncNode->vgId, sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu",
syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
oldSenders[i]->privateTerm); pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm);
if (gRaftDetailLog) { if (gRaftDetailLog) {
; ;
} }
...@@ -1376,9 +1376,12 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex ...@@ -1376,9 +1376,12 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
char host[128]; char host[128];
uint16_t port; uint16_t port;
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
sDebug("vgId:%d sync event %s currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu", sDebug(
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, "
(pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j], oldSenders[j]->privateTerm); "privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j],
oldSenders[j]->privateTerm);
(pSyncNode->senders)[i] = oldSenders[j]; (pSyncNode->senders)[i] = oldSenders[j];
oldSenders[j] = NULL; oldSenders[j] = NULL;
reset = true; reset = true;
...@@ -1386,9 +1389,11 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex ...@@ -1386,9 +1389,11 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
// reset replicaIndex // reset replicaIndex
int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex; int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
(pSyncNode->senders)[i]->replicaIndex = i; (pSyncNode->senders)[i]->replicaIndex = i;
sDebug("vgId:%d sync event %s currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", sDebug(
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, "
oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset); "reset:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset);
} }
} }
} }
...@@ -1397,9 +1402,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex ...@@ -1397,9 +1402,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] == NULL) { if ((pSyncNode->senders)[i] == NULL) {
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
sDebug("vgId:%d sync event %s currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu", sDebug(
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu",
(pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm); pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm);
} }
} }
...@@ -1407,8 +1413,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex ...@@ -1407,8 +1413,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (oldSenders[i] != NULL) { if (oldSenders[i] != NULL) {
snapshotSenderDestroy(oldSenders[i]); snapshotSenderDestroy(oldSenders[i]);
sDebug("vgId:%d sync event %s currentTerm:%lu delete old sender %p replicaIndex:%d", pSyncNode->vgId, sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d",
syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, oldSenders[i], i); pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, oldSenders[i], i);
oldSenders[i] = NULL; oldSenders[i] = NULL;
} }
} }
...@@ -1479,10 +1486,11 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { ...@@ -1479,10 +1486,11 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s", "restoreFinish:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->restoreFinish, debugStr); pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
pSyncNode->restoreFinish, debugStr);
// maybe clear leader cache // maybe clear leader cache
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
...@@ -1519,9 +1527,12 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { ...@@ -1519,9 +1527,12 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// reset restoreFinish // reset restoreFinish
pSyncNode->restoreFinish = false; pSyncNode->restoreFinish = false;
sDebug("vgId:%d sync event %s currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, restoreFinish:%d, %s", sDebug(
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, "
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->restoreFinish, debugStr); "restoreFinish:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
pSyncNode->restoreFinish, debugStr);
// state change // state change
pSyncNode->state = TAOS_SYNC_STATE_LEADER; pSyncNode->state = TAOS_SYNC_STATE_LEADER;
...@@ -2095,14 +2106,15 @@ const char* syncStr(ESyncState state) { ...@@ -2095,14 +2106,15 @@ const char* syncStr(ESyncState state) {
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
sDebug("vgId:%d sync event %s currentTerm:%lu begin leader transfer", ths->vgId, syncUtilState2String(ths->state), sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer", ths->vgId,
ths->pRaftStore->currentTerm); syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm);
if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) { pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) {
sDebug("vgId:%d sync event %s currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId, sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId,
syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr); pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort,
pSyncLeaderTransfer->newLeaderId.addr);
// reset elect timer now! // reset elect timer now!
int32_t electMS = 1; int32_t electMS = 1;
...@@ -2240,9 +2252,10 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE ...@@ -2240,9 +2252,10 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
int32_t code = 0; int32_t code = 0;
ESyncState state = flag; ESyncState state = flag;
sDebug("vgId:%d sync event %s currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64
ths->vgId, syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, beginIndex, endIndex, ", %s",
syncUtilState2String(state)); ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, beginIndex,
endIndex, syncUtilState2String(state));
// execute fsm // execute fsm
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
...@@ -2290,9 +2303,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ...@@ -2290,9 +2303,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ths->pFsm->FpRestoreFinishCb(ths->pFsm); ths->pFsm->FpRestoreFinishCb(ths->pFsm);
} }
ths->restoreFinish = true; ths->restoreFinish = true;
sDebug("vgId:%d sync event %s currentTerm:%lu restore finish, %s, index:%ld", ths->vgId, sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld", ths->vgId,
syncUtilState2String(ths->state), ths->pRaftStore->currentTerm, syncUtilState2String(ths->state), syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
pEntry->index); syncUtilState2String(ths->state), pEntry->index);
} }
} }
......
...@@ -163,8 +163,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -163,8 +163,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
walFsync(pWal, true); walFsync(pWal, true);
sDebug("vgId:%d sync event %s currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d", sDebug(
pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, "
"originalRpcType:%s,%d",
pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex,
pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy,
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
...@@ -323,11 +325,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { ...@@ -323,11 +325,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
walFsync(pWal, true); walFsync(pWal, true);
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, "
"originalRpcType:%s,%d", "originalRpcType:%s,%d",
pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftStore->currentTerm, pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex,
pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType), pEntry->msgType, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy,
TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
return code; return code;
} }
......
...@@ -46,10 +46,11 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) { ...@@ -46,10 +46,11 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub)); taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data; SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event %s currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", sDebug(
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p",
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pStub->rpcMsg.info.handle, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pStub->rpcMsg.info.ahandle); pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode,
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
taosThreadMutexUnlock(&(pObj->mutex)); taosThreadMutexUnlock(&(pObj->mutex));
return keyCode; return keyCode;
...@@ -72,10 +73,12 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) { ...@@ -72,10 +73,12 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
memcpy(pStub, pTmp, sizeof(SRespStub)); memcpy(pStub, pTmp, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data; SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event %s currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p", sDebug(
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p "
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle, "ahandle:%p",
pStub->rpcMsg.info.ahandle); pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index,
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
taosThreadMutexUnlock(&(pObj->mutex)); taosThreadMutexUnlock(&(pObj->mutex));
return 1; // get one object return 1; // get one object
...@@ -92,10 +95,12 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu ...@@ -92,10 +95,12 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
memcpy(pStub, pTmp, sizeof(SRespStub)); memcpy(pStub, pTmp, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data; SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event %s currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p", sDebug(
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p "
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle, "ahandle:%p",
pStub->rpcMsg.info.ahandle); pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index,
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
taosHashRemove(pObj->pRespHash, &index, sizeof(index)); taosHashRemove(pObj->pRespHash, &index, sizeof(index));
taosThreadMutexUnlock(&(pObj->mutex)); taosThreadMutexUnlock(&(pObj->mutex));
......
...@@ -141,21 +141,23 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { ...@@ -141,21 +141,23 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send " "lastConfigIndex:%ld privateTerm:%lu send "
"msg:%s", "msg:%s",
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm, msgStr); pSender->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu", "lastConfigIndex:%ld privateTerm:%lu",
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm); pSender->privateTerm);
...@@ -285,31 +287,34 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -285,31 +287,34 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send " "lastConfigIndex:%ld privateTerm:%lu send "
"msg:%s", "msg:%s",
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm, msgStr); pSender->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu", "lastConfigIndex:%ld privateTerm:%lu",
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm); pSender->privateTerm);
} }
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu", "lastConfigIndex:%ld privateTerm:%lu",
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm); pSender->privateTerm);
...@@ -344,15 +349,18 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { ...@@ -344,15 +349,18 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
"privateTerm:%lu send "
"msg:%s", "msg:%s",
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm,
msgStr); msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug("vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu", sDebug(
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
"privateTerm:%lu",
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm); pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm);
} }
...@@ -586,19 +594,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -586,19 +594,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, " "lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, " "lastTerm:%lu, "
"lastConfigIndex:%ld privateTerm:%lu", "lastConfigIndex:%ld privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, pReceiver->privateTerm);
} }
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
...@@ -636,18 +648,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -636,18 +648,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
bool isDrop; bool isDrop;
if (IamInNew) { if (IamInNew) {
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu update config by snapshot, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld ", "lastConfigIndex:%ld ",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop); syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu do not update config by snapshot, I am not in newCfg, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in "
"newCfg, "
"lastIndex:%ld, lastTerm:%lu, " "lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld ", "lastConfigIndex:%ld ",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
} }
// change isStandBy to normal // change isStandBy to normal
...@@ -680,21 +694,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -680,21 +694,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"index:%ld, "
"snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s", "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex,
pReceiver->privateTerm, logSimpleStr); snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm, logSimpleStr);
taosMemoryFree(logSimpleStr); taosMemoryFree(logSimpleStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"index:%ld, "
"snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu", "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex,
pReceiver->privateTerm); snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm);
} }
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
...@@ -705,17 +721,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -705,17 +721,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu", "lastConfigIndex:%ld, privateTerm:%lu",
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
} }
...@@ -732,20 +750,22 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -732,20 +750,22 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, " "lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv " "lastConfigIndex:%ld, privateTerm:%lu, recv "
"msg:%s", "msg:%s",
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, " "lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu", "lastConfigIndex:%ld, privateTerm:%lu",
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
} }
...@@ -767,19 +787,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -767,19 +787,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, " "lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, " "lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu", "lastConfigIndex:%ld, privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, host, port, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, pReceiver->privateTerm);
} }
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册