提交 9a9200d2 编写于 作者: M Minghao Li

refactor(sync): add restore finish when become leader again

上级 77b03ac8
......@@ -713,7 +713,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
// delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0);
sDebug("vgId:%d sync event log truncate, from %ld to %ld", ths->vgId, delBegin, delEnd);
sDebug("vgId:%d sync event currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, ths->pRaftStore->currentTerm,
delBegin, delEnd);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
return code;
......@@ -994,8 +995,8 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
SyncIndex commitEnd = snapshot.lastApplyIndex;
ths->commitIndex = snapshot.lastApplyIndex;
sDebug("vgId:%d sync event commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, commitBegin,
commitEnd, syncUtilState2String(ths->state));
sDebug("vgId:%d sync event currentTerm:%lu commit by snapshot from index:%ld to index:%ld, %s", ths->vgId,
ths->pRaftStore->currentTerm, commitBegin, commitEnd, syncUtilState2String(ths->state));
}
SyncIndex beginIndex = ths->commitIndex + 1;
......
......@@ -190,18 +190,19 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if (gRaftDetailLog) {
char* s = snapshotSender2Str(pSender);
sDebug(
"vgId:%d sync event snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu "
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu "
"sender:%s",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
pSender->snapshot.lastConfigIndex, pSender->privateTerm, s);
ths->vgId, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, s);
taosMemoryFree(s);
} else {
sDebug(
"vgId:%d sync event snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
"lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
pSender->snapshot.lastConfigIndex, pSender->privateTerm);
ths->vgId, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm);
}
}
......
......@@ -56,8 +56,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex commitEnd = snapshot.lastApplyIndex;
pSyncNode->commitIndex = snapshot.lastApplyIndex;
sDebug("vgId:%d sync event commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId,
pSyncNode->commitIndex, snapshot.lastApplyIndex, syncUtilState2String(pSyncNode->state));
sDebug("vgId:%d sync event currentTerm:%lu commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId,
pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex,
syncUtilState2String(pSyncNode->state));
}
// update commit index
......
......@@ -562,7 +562,8 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
return -1;
}
assert(rid == pSyncNode->rid);
sDebug("vgId:%d sync event propose msgType:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), pMsg->msgType);
sDebug("vgId:%d sync event currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType);
ret = syncNodePropose(pSyncNode, pMsg, isWeak);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
......@@ -571,7 +572,8 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0;
sDebug("vgId:%d sync event propose msgType:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), pMsg->msgType);
sDebug("vgId:%d sync event currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SRespStub stub;
......@@ -604,8 +606,6 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak)
SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo;
sDebug("vgId:%d sync event sync open", pSyncInfo->vgId);
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
assert(pSyncNode != NULL);
memset(pSyncNode, 0, sizeof(SSyncNode));
......@@ -819,6 +819,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// snapshot meta
// pSyncNode->sMeta.lastConfigIndex = -1;
sDebug("vgId:%d sync event currentTerm:%lu sync open", pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm);
return pSyncNode;
}
......@@ -864,7 +866,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
}
void syncNodeClose(SSyncNode* pSyncNode) {
sDebug("vgId:%d sync event sync close", pSyncNode->vgId);
sDebug("vgId:%d sync event currentTerm:%lu sync close", pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm);
int32_t ret;
assert(pSyncNode != NULL);
......@@ -1303,8 +1305,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
oldSenders[i] = (pSyncNode->senders)[i];
sDebug("vgId:%d sync event save senders %d, %p, privateTerm:%lu", pSyncNode->vgId, i, oldSenders[i],
oldSenders[i]->privateTerm);
sDebug("vgId:%d sync event currentTerm:%lu save senders %d, %p, privateTerm:%lu", pSyncNode->vgId,
pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm);
if (gRaftDetailLog) {
;
}
......@@ -1356,8 +1358,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
char host[128];
uint16_t port;
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
sDebug("vgId:%d sync event reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu", pSyncNode->vgId,
(pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j], oldSenders[j]->privateTerm);
sDebug("vgId:%d sync event currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port,
oldSenders[j], oldSenders[j]->privateTerm);
(pSyncNode->senders)[i] = oldSenders[j];
oldSenders[j] = NULL;
reset = true;
......@@ -1365,8 +1368,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
// reset replicaIndex
int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
(pSyncNode->senders)[i]->replicaIndex = i;
sDebug("vgId:%d sync event udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", pSyncNode->vgId,
oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset);
sDebug("vgId:%d sync event currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port,
(pSyncNode->senders)[i], reset);
}
}
}
......@@ -1375,8 +1379,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] == NULL) {
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
sDebug("vgId:%d sync event create new sender %p replicaIndex:%d, privateTerm:%lu", pSyncNode->vgId,
(pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm);
sDebug("vgId:%d sync event currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i,
(pSyncNode->senders)[i]->privateTerm);
}
}
......@@ -1384,7 +1389,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (oldSenders[i] != NULL) {
snapshotSenderDestroy(oldSenders[i]);
sDebug("vgId:%d sync event delete old sender %p replicaIndex:%d", pSyncNode->vgId, oldSenders[i], i);
sDebug("vgId:%d sync event currentTerm:%lu delete old sender %p replicaIndex:%d", pSyncNode->vgId,
pSyncNode->pRaftStore->currentTerm, oldSenders[i], i);
oldSenders[i] = NULL;
}
}
......@@ -1454,9 +1460,11 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
}
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
sDebug("vgId:%d sync event become follower, isStandBy:%d, currentTerm:%lu, replicaNum:%d, restoreFinish:%d, %s",
pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftStore->currentTerm, pSyncNode->replicaNum,
pSyncNode->restoreFinish, debugStr);
sDebug(
"vgId:%d sync event currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
pSyncNode->restoreFinish, debugStr);
// maybe clear leader cache
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
......@@ -1493,8 +1501,8 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// reset restoreFinish
pSyncNode->restoreFinish = false;
sDebug("vgId:%d sync event become leader, isStandBy:%d, currentTerm:%lu, replicaNum:%d, restoreFinish:%d, %s",
pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftStore->currentTerm, pSyncNode->replicaNum,
sDebug("vgId:%d sync event currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, restoreFinish:%d, %s",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
pSyncNode->restoreFinish, debugStr);
// state change
......@@ -2069,21 +2077,13 @@ const char* syncStr(ESyncState state) {
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
/*
char host[128];
uint16_t port;
syncUtilU642Addr(pSyncLeaderTransfer->newLeaderId.addr, host, sizeof(host), &port);
sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId, host, port,
pSyncLeaderTransfer->newLeaderId.addr);
*/
sDebug("vgId:%d sync event begin leader transfer", ths->vgId);
sDebug("vgId:%d sync event currentTerm:%lu begin leader transfer", ths->vgId, ths->pRaftStore->currentTerm);
if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) {
sDebug("vgId:%d sync event maybe leader transfer to %s:%d %lu", ths->vgId,
pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort,
pSyncLeaderTransfer->newLeaderId.addr);
sDebug("vgId:%d sync event currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId,
ths->pRaftStore->currentTerm, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
// reset elect timer now!
int32_t electMS = 1;
......@@ -2205,8 +2205,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
int32_t code = 0;
ESyncState state = flag;
sDebug("vgId:%d sync event commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex,
endIndex, syncUtilState2String(state));
sDebug("vgId:%d sync event currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId,
ths->pRaftStore->currentTerm, beginIndex, endIndex, syncUtilState2String(state));
// execute fsm
if (ths->pFsm != NULL) {
......@@ -2254,8 +2254,8 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ths->pFsm->FpRestoreFinishCb(ths->pFsm);
}
ths->restoreFinish = true;
sDebug("vgId:%d sync event restore finish, %s, index:%ld", ths->vgId, syncUtilState2String(ths->state),
pEntry->index);
sDebug("vgId:%d sync event currentTerm:%lu restore finish, %s, index:%ld", ths->vgId,
ths->pRaftStore->currentTerm, syncUtilState2String(ths->state), pEntry->index);
}
}
......
......@@ -15,6 +15,7 @@
#include "syncRaftLog.h"
#include "syncRaftCfg.h"
#include "syncRaftStore.h"
#include "wal.h"
// refactor, log[0 .. n] ==> log[m .. n]
......@@ -162,10 +163,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
walFsync(pWal, true);
sDebug("vgId:%d sync event write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d",
pData->pSyncNode->vgId, pEntry->index, syncUtilState2String(pData->pSyncNode->state),
pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType), pEntry->msgType,
TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
sDebug("vgId:%d sync event currentTerm:%lu write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d",
pData->pSyncNode->vgId, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index,
syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy,
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
return code;
}
......@@ -321,7 +322,12 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
walFsync(pWal, true);
sDebug("sync event old write wal: %ld", pEntry->index);
sDebug(
"vgId:%d sync event currentTerm:%lu old write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d",
pData->pSyncNode->vgId, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index,
syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType),
pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
return code;
}
......
......@@ -14,6 +14,7 @@
*/
#include "syncRespMgr.h"
#include "syncRaftStore.h"
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
SSyncRespMgr *pObj = (SSyncRespMgr *)taosMemoryMalloc(sizeof(SSyncRespMgr));
......@@ -45,9 +46,9 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId,
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pStub->rpcMsg.info.handle,
pStub->rpcMsg.info.ahandle);
sDebug("vgId:%d sync event currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode,
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
taosThreadMutexUnlock(&(pObj->mutex));
return keyCode;
......@@ -70,9 +71,9 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
memcpy(pStub, pTmp, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId,
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle,
pStub->rpcMsg.info.ahandle);
sDebug("vgId:%d sync event currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType,
index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
taosThreadMutexUnlock(&(pObj->mutex));
return 1; // get one object
......@@ -89,9 +90,9 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
memcpy(pStub, pTmp, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId,
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle,
pStub->rpcMsg.info.ahandle);
sDebug("vgId:%d sync event currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType,
index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
taosHashRemove(pObj->pRespHash, &index, sizeof(index));
taosThreadMutexUnlock(&(pObj->mutex));
......
......@@ -141,18 +141,22 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send "
"msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, msgStr);
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm);
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm);
}
syncSnapshotSendDestroy(pMsg);
......@@ -279,25 +283,31 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send "
"msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, msgStr);
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm);
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm);
}
} else {
sDebug(
"vgId:%d sync event snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm);
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm);
}
syncSnapshotSendDestroy(pMsg);
......@@ -328,12 +338,15 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->privateTerm, msgStr);
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send msg:%s",
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->privateTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sDebug("vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu", pSender->pSyncNode->vgId,
host, port, pSender->seq, pSender->ack, pSender->privateTerm);
sDebug("vgId:%d sync event currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu",
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq,
pSender->ack, pSender->privateTerm);
}
syncSnapshotSendDestroy(pMsg);
......@@ -485,7 +498,7 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
pReceiver->start = false;
if (apply) {
++(pReceiver->privateTerm);
// ++(pReceiver->privateTerm);
}
if (gRaftDetailLog) {
......@@ -566,17 +579,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex,
pReceiver->privateTerm, msgStr);
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld privateTerm:%lu",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex,
pReceiver->privateTerm);
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
}
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
......@@ -612,14 +625,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
bool isDrop;
if (IamInNew) {
sDebug("vgId:%d sync event update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld ",
pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
sDebug(
"vgId:%d sync event currentTerm:%lu update config by snapshot, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld ",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex);
syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
} else {
sDebug(
"vgId:%d sync event do not update config by snapshot, I am not in newCfg, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event currentTerm:%lu do not update config by snapshot, I am not in newCfg, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld ",
pSyncNode->vgId, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex);
}
// change isStandBy to normal
......@@ -644,19 +662,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) {
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
sDebug(
"vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, "
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s",
pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
snapshot.lastConfigIndex, pReceiver->privateTerm, logSimpleStr);
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1,
snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm,
logSimpleStr);
taosMemoryFree(logSimpleStr);
} else {
sDebug(
"vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, "
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu",
pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
snapshot.lastConfigIndex, pReceiver->privateTerm);
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1,
snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm);
}
pReceiver->pWriter = NULL;
......@@ -667,17 +686,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack,
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, pReceiver->privateTerm);
pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack,
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
}
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
......@@ -692,18 +711,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv "
"msg:%s",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack,
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, pReceiver->privateTerm);
pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack,
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
}
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
......@@ -723,17 +744,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex,
pReceiver->privateTerm, msgStr);
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex,
pReceiver->privateTerm);
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
}
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册