提交 e9d466ec 编写于 作者: M Minghao Li

refactor(sync) delete some trace log

上级 73686254
......@@ -49,8 +49,8 @@ void raftStoreClearVote(SRaftStore *pRaftStore);
void raftStoreNextTerm(SRaftStore *pRaftStore);
void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term);
int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson);
cJSON *raftStore2Json(SRaftStore *pRaftStore);
char *raftStore2Str(SRaftStore *pRaftStore);
cJSON * raftStore2Json(SRaftStore *pRaftStore);
char * raftStore2Str(SRaftStore *pRaftStore);
// for debug -------------------
void raftStorePrint(SRaftStore *pObj);
......
......@@ -39,8 +39,8 @@ typedef struct SSyncSnapshotSender {
bool start;
int32_t seq;
int32_t ack;
void *pReader;
void *pCurrentBlock;
void * pReader;
void * pCurrentBlock;
int32_t blockLen;
SSnapshot snapshot;
int64_t sendingMS;
......@@ -58,28 +58,29 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender);
void snapshotSenderStop(SSyncSnapshotSender *pSender);
int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char *snapshotSender2Str(SSyncSnapshotSender *pSender);
cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender);
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
typedef struct SSyncSnapshotReceiver {
bool start;
int32_t ack;
void *pWriter;
void * pWriter;
SyncTerm term;
SyncTerm privateTerm;
SSyncNode *pSyncNode;
int32_t replicaIndex;
SRaftId fromId;
} SSyncSnapshotReceiver;
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t replicaIndex);
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId);
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg);
int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg);
......
......@@ -386,11 +386,13 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
}
}
char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
taosMemoryFree(sOld);
taosMemoryFree(sNew);
if (gRaftDetailLog) {
char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
taosMemoryFree(sOld);
taosMemoryFree(sNew);
}
}
// always call FpReConfigCb
......@@ -745,10 +747,13 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
// advance commit index to sanpshot first
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex > ths->commitIndex) {
sInfo("sync event vgId:%d commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, ths->commitIndex,
snapshot.lastApplyIndex, syncUtilState2String(ths->state));
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) {
SyncIndex commitBegin = ths->commitIndex;
SyncIndex commitEnd = snapshot.lastApplyIndex;
ths->commitIndex = snapshot.lastApplyIndex;
sInfo("sync event vgId:%d commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, commitBegin,
commitEnd, syncUtilState2String(ths->state));
}
SyncIndex beginIndex = ths->commitIndex + 1;
......
......@@ -121,7 +121,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
syncIndexMgrLog2("recv SyncAppendEntriesReply, before pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex);
{
if (gRaftDetailLog) {
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
......@@ -147,7 +147,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if (pMsg->success) {
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
sTrace("update next match, index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success);
if (gRaftDetailLog) {
sTrace("update next match, index:%ld, success:%d", pMsg->matchIndex + 1, pMsg->success);
}
// matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
......@@ -159,7 +162,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
} else {
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
sTrace("update next not match, begin, index:%ld, success:%d", nextIndex, pMsg->success);
if (gRaftDetailLog) {
sTrace("update next index not match, begin, index:%ld, success:%d", nextIndex, pMsg->success);
}
// notice! int64, uint64
if (nextIndex > SYNC_INDEX_BEGIN) {
......@@ -182,12 +187,19 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
uint16_t port;
syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
char* s = snapshotSender2Str(pSender);
sInfo(
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu "
"sender:%s",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, s);
taosMemoryFree(s);
if (gRaftDetailLog) {
char* s = snapshotSender2Str(pSender);
sInfo(
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu "
"sender:%s",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, s);
taosMemoryFree(s);
} else {
sInfo(
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
"lastApplyTerm:%lu",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm);
}
}
SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1;
......@@ -202,12 +214,14 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
}
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
sTrace("update next not match, end, index:%ld, success:%d", nextIndex, pMsg->success);
if (gRaftDetailLog) {
sTrace("update next index not match, end, index:%ld, success:%d", nextIndex, pMsg->success);
}
}
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex);
{
if (gRaftDetailLog) {
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
sTrace("recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
......
......@@ -51,18 +51,25 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
// advance commit index to sanpshot first
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
if (snapshot.lastApplyIndex > 0 && snapshot.lastApplyIndex > pSyncNode->commitIndex) {
SyncIndex commitBegin = pSyncNode->commitIndex;
SyncIndex commitEnd = snapshot.lastApplyIndex;
pSyncNode->commitIndex = snapshot.lastApplyIndex;
sInfo("sync event vgId:%d commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId,
pSyncNode->commitIndex, snapshot.lastApplyIndex, syncUtilState2String(pSyncNode->state));
pSyncNode->commitIndex = snapshot.lastApplyIndex;
}
// update commit index
SyncIndex newCommitIndex = pSyncNode->commitIndex;
for (SyncIndex index = syncNodeGetLastIndex(pSyncNode); index > pSyncNode->commitIndex; --index) {
bool agree = syncAgree(pSyncNode, index);
sTrace("syncMaybeAdvanceCommitIndex syncAgree:%d, index:%ld, pSyncNode->commitIndex:%ld", agree, index,
pSyncNode->commitIndex);
if (gRaftDetailLog) {
sTrace("syncMaybeAdvanceCommitIndex syncAgree:%d, index:%ld, pSyncNode->commitIndex:%ld", agree, index,
pSyncNode->commitIndex);
}
if (agree) {
// term
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index);
......@@ -72,16 +79,21 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
if (pEntry->term == pSyncNode->pRaftStore->currentTerm) {
// update commit index
newCommitIndex = index;
sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld",
newCommitIndex, pSyncNode->commitIndex);
if (gRaftDetailLog) {
sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld",
newCommitIndex, pSyncNode->commitIndex);
}
syncEntryDestory(pEntry);
break;
} else {
sTrace(
"syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%lu, "
"pSyncNode->pRaftStore->currentTerm:%lu",
pEntry->term, pSyncNode->pRaftStore->currentTerm);
if (gRaftDetailLog) {
sTrace(
"syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%lu, "
"pSyncNode->pRaftStore->currentTerm:%lu",
pEntry->term, pSyncNode->pRaftStore->currentTerm);
}
}
syncEntryDestory(pEntry);
......@@ -92,7 +104,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex beginIndex = pSyncNode->commitIndex + 1;
SyncIndex endIndex = newCommitIndex;
sTrace("syncMaybeAdvanceCommitIndex sync commit %ld", newCommitIndex);
if (gRaftDetailLog) {
sTrace("syncMaybeAdvanceCommitIndex sync commit %ld", newCommitIndex);
}
// update commit index
pSyncNode->commitIndex = newCommitIndex;
......
......@@ -40,7 +40,7 @@ int32_t syncEnvStart() {
// gSyncEnv = doSyncEnvStart(gSyncEnv);
gSyncEnv = doSyncEnvStart();
assert(gSyncEnv != NULL);
sTrace("syncEnvStart ok!");
sTrace("sync env start ok");
return ret;
}
......
......@@ -119,7 +119,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......
......@@ -35,7 +35,7 @@
#include "syncVoteMgr.h"
#include "tref.h"
bool gRaftDetailLog = true;
bool gRaftDetailLog = false;
static int32_t tsNodeRefId = -1;
......@@ -87,7 +87,9 @@ int64_t syncOpen(const SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
assert(pSyncNode != NULL);
syncNodeLog2("syncNodeOpen open success", pSyncNode);
if (gRaftDetailLog) {
syncNodeLog2("syncNodeOpen open success", pSyncNode);
}
pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode);
if (pSyncNode->rid < 0) {
......@@ -174,7 +176,10 @@ int32_t syncSetStandby(int64_t rid) {
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
int32_t ret = 0;
char* configChange = syncCfg2Str((SSyncCfg*)pSyncCfg);
sInfo("==syncReconfig== newconfig:%s", configChange);
if (gRaftDetailLog) {
sInfo("==syncReconfig== newconfig:%s", configChange);
}
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE;
......@@ -374,13 +379,14 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
}
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
sTrace("syncPropose msgType:%d ", pMsg->msgType);
int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) return TAOS_SYNC_PROPOSE_OTHER_ERROR;
if (pSyncNode == NULL) {
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
assert(rid == pSyncNode->rid);
sTrace("sync event vgId:%d propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SRespStub stub;
......@@ -441,9 +447,11 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
assert(pSyncNode->pRaftCfg != NULL);
pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
char* seralized = raftCfg2Str(pSyncNode->pRaftCfg);
sInfo("syncNodeOpen update config :%s", seralized);
taosMemoryFree(seralized);
if (gRaftDetailLog) {
char* seralized = raftCfg2Str(pSyncNode->pRaftCfg);
sInfo("syncNodeOpen update config :%s", seralized);
taosMemoryFree(seralized);
}
raftCfgClose(pSyncNode->pRaftCfg);
}
......@@ -614,7 +622,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
}
// snapshot receivers
pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, 100);
pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
// start in syncNodeStart
// start raft
......@@ -632,49 +640,28 @@ void syncNodeStart(SSyncNode* pSyncNode) {
raftStoreNextTerm(pSyncNode->pRaftStore);
syncNodeBecomeLeader(pSyncNode, "one replica start");
syncNodeLog2("==state change become leader immediately==", pSyncNode);
// Raft 3.6.2 Committing entries from previous terms
// use this now
syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
/*
sInfo("==syncNodeStart== RestoreFinish begin 1 replica tsem_wait %p", pSyncNode);
tsem_wait(&pSyncNode->restoreSem);
sInfo("==syncNodeStart== RestoreFinish end 1 replica tsem_wait %p", pSyncNode);
*/
/*
while (pSyncNode->restoreFinish != true) {
taosMsleep(10);
if (gRaftDetailLog) {
syncNodeLog2("==state change become leader immediately==", pSyncNode);
}
*/
sInfo("==syncNodeStart== restoreFinish ok 1 replica %p vgId:%d", pSyncNode, pSyncNode->vgId);
return;
}
syncNodeBecomeFollower(pSyncNode, "first start");
// for test
int32_t ret = 0;
// int32_t ret = 0;
// ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
/*
sInfo("==syncNodeStart== RestoreFinish begin multi replica tsem_wait %p", pSyncNode);
tsem_wait(&pSyncNode->restoreSem);
sInfo("==syncNodeStart== RestoreFinish end multi replica tsem_wait %p", pSyncNode);
*/
// assert(ret == 0);
/*
while (pSyncNode->restoreFinish != true) {
taosMsleep(10);
if (gRaftDetailLog) {
syncNodeLog2("==state change become leader immediately==", pSyncNode);
}
*/
sInfo("==syncNodeStart== restoreFinish ok multi replica %p vgId:%d", pSyncNode, pSyncNode->vgId);
}
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
......@@ -1135,7 +1122,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDro
}
raftCfgPersist(pSyncNode->pRaftCfg);
syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
if (gRaftDetailLog) {
syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
}
}
SSyncNode* syncNodeAcquire(int64_t rid) {
......@@ -1475,9 +1465,11 @@ void syncNodeLog(SSyncNode* pObj) {
}
void syncNodeLog2(char* s, SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
sTraceLong("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
taosMemoryFree(serialized);
if (gRaftDetailLog) {
char* serialized = syncNode2Str(pObj);
sTraceLong("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
// ------ local funciton ---------
......@@ -1807,11 +1799,13 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
}
}
char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
taosMemoryFree(sOld);
taosMemoryFree(sNew);
if (gRaftDetailLog) {
char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
taosMemoryFree(sOld);
taosMemoryFree(sNew);
}
}
// always call FpReConfigCb
......@@ -1834,7 +1828,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ths->pFsm->FpRestoreFinishCb(ths->pFsm);
}
ths->restoreFinish = true;
sInfo("restore finish %p vgId:%d", ths, ths->vgId);
sInfo("sync event vgId:%d restore finish", ths->vgId);
}
}
......
......@@ -122,7 +122,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
syncIndexMgrLog2("begin append entries peers pNextIndex:", pSyncNode->pNextIndex);
syncIndexMgrLog2("begin append entries peers pMatchIndex:", pSyncNode->pMatchIndex);
logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore);
{
if (gRaftDetailLog) {
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
sTrace("begin append entries peers, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
......@@ -201,7 +201,6 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
}
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
sTrace("syncNodeAppendEntries pSyncNode:%p ", pSyncNode);
int32_t ret = 0;
SRpcMsg rpcMsg;
......
......@@ -20,7 +20,7 @@
#include "syncUtil.h"
#include "wal.h"
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm);
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId);
//----------------------------------
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
......@@ -105,15 +105,23 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
char *msgStr = syncSnapshotSend2Str(pMsg);
char host[128];
uint16_t port;
syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
sTrace(
"sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, msgStr);
taosMemoryFree(msgStr);
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace(
"sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send "
"msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm);
}
syncSnapshotSendDestroy(pMsg);
}
......@@ -185,9 +193,11 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender) {
pSender->start = false;
char *s = snapshotSender2Str(pSender);
sInfo("snapshotSenderStop %s", s);
taosMemoryFree(s);
if (gRaftDetailLog) {
char *s = snapshotSender2Str(pSender);
sInfo("snapshotSenderStop %s", s);
taosMemoryFree(s);
}
}
// when sender receiver ack, call this function to send msg from seq
......@@ -227,24 +237,29 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
char *msgStr = syncSnapshotSend2Str(pMsg);
char host[128];
uint16_t port;
syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
sTrace(
"sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send "
"msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, msgStr);
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace(
"sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send "
"msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm);
}
} else {
sTrace(
"sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send "
"msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, msgStr);
sTrace("sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm);
}
taosMemoryFree(msgStr);
syncSnapshotSendDestroy(pMsg);
return 0;
......@@ -266,13 +281,19 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
syncSnapshotSend2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
char *msgStr = syncSnapshotSend2Str(pMsg);
char host[128];
uint16_t port;
syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
sTrace("sync event vgId:%d snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", pSender->pSyncNode->vgId, host,
port, pSender->seq, pSender->ack, msgStr);
taosMemoryFree(msgStr);
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("sync event vgId:%d snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", pSender->pSyncNode->vgId,
host, port, pSender->seq, pSender->ack, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot send to %s:%d resend seq:%d ack:%d", pSender->pSyncNode->vgId, host, port,
pSender->seq, pSender->ack);
}
syncSnapshotSendDestroy(pMsg);
}
......@@ -337,7 +358,7 @@ char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
}
// -------------------------------------
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
(pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
......@@ -351,7 +372,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
pReceiver->pWriter = NULL;
pReceiver->pSyncNode = pSyncNode;
pReceiver->replicaIndex = replicaIndex;
pReceiver->fromId = fromId;
pReceiver->term = pSyncNode->pRaftStore->currentTerm;
pReceiver->privateTerm = 0;
......@@ -371,10 +392,11 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
// begin receive snapshot msg (current term, seq begin)
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm) {
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) {
pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
pReceiver->privateTerm = privateTerm;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
pReceiver->fromId = fromId;
ASSERT(pReceiver->pWriter == NULL);
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
......@@ -383,14 +405,15 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm) {
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) {
if (!snapshotReceiverIsStart(pReceiver)) {
// start
snapshotReceiverDoStart(pReceiver, privateTerm);
snapshotReceiverDoStart(pReceiver, privateTerm, fromId);
pReceiver->start = true;
} else {
// already start
sInfo("snapshot recv, receiver already start");
// force close, abandon incomplete data
int32_t ret =
......@@ -399,15 +422,15 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTer
pReceiver->pWriter = NULL;
// start again
snapshotReceiverDoStart(pReceiver, privateTerm);
snapshotReceiverDoStart(pReceiver, privateTerm, fromId);
pReceiver->start = true;
ASSERT(0);
}
char *s = snapshotReceiver2Str(pReceiver);
sInfo("snapshotReceiverStart %s", s);
taosMemoryFree(s);
if (gRaftDetailLog) {
char *s = snapshotReceiver2Str(pReceiver);
sInfo("snapshotReceiverStart %s", s);
taosMemoryFree(s);
}
}
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
......@@ -424,9 +447,11 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
++(pReceiver->privateTerm);
}
char *s = snapshotReceiver2Str(pReceiver);
sInfo("snapshotReceiverStop %s", s);
taosMemoryFree(s);
if (gRaftDetailLog) {
char *s = snapshotReceiver2Str(pReceiver);
sInfo("snapshotReceiverStop %s", s);
taosMemoryFree(s);
}
}
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
......@@ -442,7 +467,22 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
cJSON_AddNumberToObject(pRoot, "replicaIndex", pReceiver->replicaIndex);
cJSON *pFromId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->fromId.addr);
cJSON_AddStringToObject(pFromId, "addr", u64buf);
{
uint64_t u64 = pReceiver->fromId.addr;
cJSON *pTmp = pFromId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
cJSON_AddItemToObject(pRoot, "fromId", pFromId);
snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
......@@ -474,17 +514,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
// begin
snapshotReceiverStart(pReceiver, pMsg->privateTerm);
snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg->srcId);
pReceiver->ack = pMsg->seq;
needRsp = true;
char *msgStr = syncSnapshotSend2Str(pMsg);
char host[128];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sTrace("sync event vgId:%d snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("sync event vgId:%d snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm);
}
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
// end, finish FSM
......@@ -492,31 +538,46 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
ASSERT(writeCode == 0);
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
char host[128];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sInfo(
"sync event vgId:%d snapshot recv from %s:%d finish, update log begin index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, raft log:%s",
pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
logSimpleStr);
taosMemoryFree(logSimpleStr);
if (gRaftDetailLog) {
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
sInfo(
"sync event vgId:%d snapshot recv from %s:%d finish, update log begin index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, raft log:%s",
pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
logSimpleStr);
taosMemoryFree(logSimpleStr);
} else {
sInfo(
"sync event vgId:%d snapshot recv from %s:%d finish, update log begin index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu",
pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
}
pReceiver->pWriter = NULL;
snapshotReceiverStop(pReceiver, true);
pReceiver->ack = pMsg->seq;
needRsp = true;
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("sync event vgId:%d snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("sync event vgId:%d snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm);
}
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false);
......@@ -527,12 +588,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace(
"sync event vgId:%d snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace(
"sync event vgId:%d snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv "
"msg:%s",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm);
}
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
// transfering
......@@ -544,13 +610,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
}
needRsp = true;
char *msgStr = syncSnapshotSend2Str(pMsg);
char host[128];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sTrace("sync event vgId:%d snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace(
"sync event vgId:%d snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sTrace("sync event vgId:%d snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm);
}
} else {
ASSERT(0);
......
......@@ -41,7 +41,11 @@ SSyncSnapshotReceiver* createReceiver() {
pSyncNode->pFsm->FpSnapshotStopWrite = SnapshotStopWrite;
pSyncNode->pFsm->FpSnapshotDoWrite = SnapshotDoWrite;
SSyncSnapshotReceiver* pReceiver = snapshotReceiverCreate(pSyncNode, 2);
SRaftId id;
id.addr = syncUtilAddr2U64("1.2.3.4", 99);
id.vgId = 100;
SSyncSnapshotReceiver* pReceiver = snapshotReceiverCreate(pSyncNode, id);
pReceiver->start = true;
pReceiver->ack = 20;
pReceiver->pWriter = (void*)0x11;
......
......@@ -235,7 +235,6 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
}
}
int64_t rid = syncOpen(&syncInfo);
assert(rid > 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册