提交 b2477629 编写于 作者: M Minghao Li

refactor(sync) refactor trace log

上级 ffd69aa3
......@@ -195,6 +195,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode);
void syncNodeEventLog(const SSyncNode* pSyncNode, char* str);
char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop);
......
......@@ -51,6 +51,7 @@ int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex);
cJSON *syncCfg2Json(SSyncCfg *pSyncCfg);
char *syncCfg2Str(SSyncCfg *pSyncCfg);
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg);
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg);
int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg);
......@@ -72,6 +73,7 @@ void syncCfgPrint(SSyncCfg *pCfg);
void syncCfgPrint2(char *s, SSyncCfg *pCfg);
void syncCfgLog(SSyncCfg *pCfg);
void syncCfgLog2(char *s, SSyncCfg *pCfg);
void syncCfgLog3(char *s, SSyncCfg *pCfg);
void raftCfgPrint(SRaftCfg *pCfg);
void raftCfgPrint2(char *s, SRaftCfg *pCfg);
......
......@@ -39,8 +39,8 @@ typedef struct SSyncSnapshotSender {
bool start;
int32_t seq;
int32_t ack;
void * pReader;
void * pCurrentBlock;
void *pReader;
void *pCurrentBlock;
int32_t blockLen;
SSnapshot snapshot;
SSyncCfg lastConfig;
......@@ -59,14 +59,15 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender);
void snapshotSenderStop(SSyncSnapshotSender *pSender);
int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender);
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char *snapshotSender2Str(SSyncSnapshotSender *pSender);
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
typedef struct SSyncSnapshotReceiver {
bool start;
int32_t ack;
void * pWriter;
void *pWriter;
SyncTerm term;
SyncTerm privateTerm;
......@@ -80,8 +81,9 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver)
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply);
cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg);
int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg);
......
......@@ -329,358 +329,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
return ret;
}
#if 0
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = 0;
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesCb== term:%lu", ths->pRaftStore->currentTerm);
syncAppendEntriesLog2(logBuf, pMsg);
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
}
assert(pMsg->term <= ths->pRaftStore->currentTerm);
// reset elect timer
if (pMsg->term == ths->pRaftStore->currentTerm) {
ths->leaderCache = pMsg->srcId;
syncNodeResetElectTimer(ths);
}
assert(pMsg->dataLen >= 0);
SyncTerm localPreLogTerm = 0;
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, pMsg->prevLogIndex);
assert(pEntry != NULL);
localPreLogTerm = pEntry->term;
syncEntryDestory(pEntry);
}
bool logOK =
(pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm));
// reject request
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
sTrace(
"syncNodeOnAppendEntriesCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
"logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID;
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
return ret;
}
// return to follower state
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
sTrace(
"syncNodeOnAppendEntriesCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"ths->state:%d, logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
syncNodeBecomeFollower(ths, "from candidate by append entries");
// ret or reply?
return ret;
}
// accept request
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
// preIndex = -1, or has preIndex entry in local log
assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
// has extra entries (> preIndex) in local log
bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore);
// has entries in SyncAppendEntries msg
bool hasAppendEntries = pMsg->dataLen > 0;
sTrace(
"syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
"logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);
if (hasExtraEntries && hasAppendEntries) {
// not conflict by default
bool conflict = false;
SyncIndex extraIndex = pMsg->prevLogIndex + 1;
SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex);
assert(pExtraEntry != NULL);
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
// log not match, conflict
assert(extraIndex == pAppendEntry->index);
if (pExtraEntry->term != pAppendEntry->term) {
conflict = true;
}
if (conflict) {
// roll back
SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
SyncIndex delEnd = extraIndex;
sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);
// notice! reverse roll back!
for (SyncIndex index = delEnd; index >= delBegin; --index) {
if (ths->pFsm->FpRollBackCb != NULL) {
SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index);
assert(pRollBackEntry != NULL);
// if (pRollBackEntry->msgType != TDMT_SYNC_NOOP) {
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
SFsmCbMeta cbMeta;
cbMeta.index = pRollBackEntry->index;
cbMeta.isWeak = pRollBackEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pRollBackEntry->seqNum;
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
rpcFreeCont(rpcMsg.pCont);
}
syncEntryDestory(pRollBackEntry);
}
}
// delete confict entries
ths->pLogStore->truncate(ths->pLogStore, extraIndex);
// append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pAppendEntry->index;
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 2;
cbMeta.state = ths->state;
cbMeta.seqNum = pAppendEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
}
// free memory
syncEntryDestory(pExtraEntry);
syncEntryDestory(pAppendEntry);
} else if (hasExtraEntries && !hasAppendEntries) {
// do nothing
} else if (!hasExtraEntries && hasAppendEntries) {
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL);
// append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pAppendEntry->index;
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 3;
cbMeta.state = ths->state;
cbMeta.seqNum = pAppendEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
// free memory
syncEntryDestory(pAppendEntry);
} else if (!hasExtraEntries && !hasAppendEntries) {
// do nothing
} else {
assert(0);
}
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = true;
if (hasAppendEntries) {
pReply->matchIndex = pMsg->prevLogIndex + 1;
} else {
pReply->matchIndex = pMsg->prevLogIndex;
}
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
// maybe update commit index from leader
if (pMsg->commitIndex > ths->commitIndex) {
// has commit entry in local
if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
SyncIndex beginIndex = ths->commitIndex + 1;
SyncIndex endIndex = pMsg->commitIndex;
// update commit index
ths->commitIndex = pMsg->commitIndex;
// call back Wal
ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
// execute fsm
if (ths->pFsm != NULL) {
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
if (i != SYNC_INDEX_INVALID) {
SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, i);
assert(pEntry != NULL);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.flag = 0x11;
SSnapshot snapshot;
ASSERT(ths->pFsm->FpGetSnapshot != NULL);
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
bool needExecute = true;
if (cbMeta.index <= snapshot.lastApplyIndex) {
needExecute = false;
}
if (needExecute) {
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
// config change
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;
SSyncCfg newSyncCfg;
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
ASSERT(ret == 0);
// update new config myIndex
bool hit = false;
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
newSyncCfg.myIndex = i;
hit = true;
break;
}
}
SReConfigCbMeta cbMeta = {0};
bool isDrop;
// I am in newConfig
if (hit) {
syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop);
// change isStandBy to normal
if (!isDrop) {
if (ths->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(ths, "config change");
} else {
syncNodeBecomeFollower(ths, "config change");
}
}
if (gRaftDetailLog) {
char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
taosMemoryFree(sOld);
taosMemoryFree(sNew);
}
}
// always call FpReConfigCb
if (ths->pFsm->FpReConfigCb != NULL) {
cbMeta.code = 0;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.index = pEntry->index;
cbMeta.term = pEntry->term;
cbMeta.newCfg = newSyncCfg;
cbMeta.oldCfg = oldSyncCfg;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.flag = 0x11;
cbMeta.isDrop = isDrop;
ths->pFsm->FpReConfigCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
// restore finish
if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) {
if (ths->restoreFinish == false) {
if (ths->pFsm->FpRestoreFinishCb != NULL) {
ths->pFsm->FpRestoreFinishCb(ths->pFsm);
}
ths->restoreFinish = true;
sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId);
/*
tsem_post(&ths->restoreSem);
sInfo("==syncNodeOnAppendEntriesCb== RestoreFinish tsem_post %p", ths);
*/
}
}
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
}
}
}
}
}
}
return ret;
}
#endif
static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t code;
......@@ -717,8 +365,10 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
// delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0);
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld", ths->vgId,
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, delBegin, delEnd);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "log truncate, from %ld to %ld", delBegin, delEnd);
syncNodeEventLog(ths, eventLog);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
return code;
......@@ -1066,10 +716,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
SyncIndex commitEnd = snapshot.lastApplyIndex;
ths->commitIndex = snapshot.lastApplyIndex;
sDebug(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld",
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
commitBegin, commitEnd);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%ld to index:%ld", commitBegin,
commitEnd);
syncNodeEventLog(ths, eventLog);
}
SyncIndex beginIndex = ths->commitIndex + 1;
......
......@@ -183,31 +183,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
pMsg->privateTerm < pSender->privateTerm) {
snapshotSenderStart(pSender);
char host[128];
uint16_t port;
syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
if (gRaftDetailLog) {
char* s = snapshotSender2Str(pSender);
sDebug(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu "
"sender:%s",
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm, s);
taosMemoryFree(s);
} else {
sDebug(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu",
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm);
}
char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
syncNodeEventLog(ths, eventLog);
taosMemoryFree(eventLog);
}
SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1;
......
......@@ -56,9 +56,10 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex commitEnd = snapshot.lastApplyIndex;
pSyncNode->commitIndex = snapshot.lastApplyIndex;
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%ld to index:%ld", pSyncNode->commitIndex,
snapshot.lastApplyIndex);
syncNodeEventLog(pSyncNode, eventLog);
}
// update commit index
......
......@@ -189,24 +189,6 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
int32_t ret = 0;
bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg);
#if 0
for (int i = 0; i < pNewCfg->replicaNum; ++i) {
if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
IamInNew = true;
}
/*
SRaftId newId;
newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
newId.vgId = pSyncNode->vgId;
if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) {
IamInNew = true;
}
*/
}
#endif
if (!IamInNew) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_NOT_IN_NEW_CONFIG;
......@@ -235,27 +217,6 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg);
#if 0
for (int i = 0; i < pNewCfg->replicaNum; ++i) {
if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
IamInNew = true;
}
/*
// some problem in inet_addr
SRaftId newId = EMPTY_RAFT_ID;
newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
newId.vgId = pSyncNode->vgId;
if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) {
IamInNew = true;
}
*/
}
#endif
if (!IamInNew) {
sError("sync reconfig error, not in new config");
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
......@@ -614,9 +575,6 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
return -1;
}
assert(rid == pSyncNode->rid);
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm,
TMSG_INFO(pMsg->msgType), pMsg->msgType);
ret = syncNodePropose(pSyncNode, pMsg, isWeak);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
......@@ -625,9 +583,10 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0;
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm,
TMSG_INFO(pMsg->msgType), pMsg->msgType);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "propose type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType);
syncNodeEventLog(pSyncNode, eventLog);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SRespStub stub;
......@@ -870,11 +829,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// start raft
// syncNodeBecomeFollower(pSyncNode);
// snapshot meta
// pSyncNode->sMeta.lastConfigIndex = -1;
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu sync open", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm);
syncNodeEventLog(pSyncNode, "sync open");
return pSyncNode;
}
......@@ -921,8 +876,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
}
void syncNodeClose(SSyncNode* pSyncNode) {
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu sync close", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm);
syncNodeEventLog(pSyncNode, "sync close");
int32_t ret;
assert(pSyncNode != NULL);
......@@ -1306,6 +1260,27 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
return serialized;
}
void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
int32_t userStrLen = strlen(str);
if (userStrLen < 256) {
char logBuf[128 + 256];
snprintf(logBuf, sizeof(logBuf),
"vgId:%d %s term:%lu commit:%ld standby:%d replica-num:%d lconfig:%ld sync event %s", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, str);
sDebug("%s", logBuf);
} else {
int len = 128 + userStrLen;
char* s = (char*)taosMemoryMalloc(len);
snprintf(s, len, "vgId:%d %s term:%lu commit:%ld standby:%d replica-num:%d lconfig:%ld sync event %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
pSyncNode->pRaftCfg->lastConfigIndex, str);
sDebug("%s", s);
taosMemoryFree(s);
}
}
char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
int len = 256;
char* s = (char*)taosMemoryMalloc(len);
......@@ -1361,12 +1336,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
oldSenders[i] = (pSyncNode->senders)[i];
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm);
if (gRaftDetailLog) {
;
}
char* eventLog = snapshotSender2SimpleStr(oldSenders[i], "snapshot sender save old");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
}
// init internal
......@@ -1415,12 +1388,14 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
char host[128];
uint16_t port;
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
sDebug(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, "
"privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j],
oldSenders[j]->privateTerm);
do {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "snapshot sender reset for %lu, newIndex:%d, %s:%d, %p",
(pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
syncNodeEventLog(pSyncNode, eventLog);
} while (0);
(pSyncNode->senders)[i] = oldSenders[j];
oldSenders[j] = NULL;
reset = true;
......@@ -1428,11 +1403,13 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
// reset replicaIndex
int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
(pSyncNode->senders)[i]->replicaIndex = i;
sDebug(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, "
"reset:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset);
do {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d",
oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset);
syncNodeEventLog(pSyncNode, eventLog);
} while (0);
}
}
}
......@@ -1441,10 +1418,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] == NULL) {
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
sDebug(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm);
char* eventLog = snapshotSender2SimpleStr((pSyncNode->senders)[i], "snapshot sender create new");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
}
}
......@@ -1452,9 +1429,13 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (oldSenders[i] != NULL) {
snapshotSenderDestroy(oldSenders[i]);
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, oldSenders[i], i);
do {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "snapshot sender delete old %p replica-index:%d", oldSenders[i], i);
syncNodeEventLog(pSyncNode, eventLog);
} while (0);
oldSenders[i] = NULL;
}
}
......@@ -1462,24 +1443,6 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
#if 0
for (int i = 0; i < oldConfig.replicaNum; ++i) {
if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
IamInOld = true;
break;
}
}
for (int i = 0; i < newConfig->replicaNum; ++i) {
if (strcmp((newConfig->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(newConfig->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
IamInNew = true;
break;
}
}
#endif
*isDrop = true;
if (IamInOld && !IamInNew) {
*isDrop = true;
......@@ -1524,13 +1487,6 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
}
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
sDebug(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
pSyncNode->restoreFinish, debugStr);
// maybe clear leader cache
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
pSyncNode->leaderCache = EMPTY_RAFT_ID;
......@@ -1542,6 +1498,21 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// reset elect timer
syncNodeResetElectTimer(pSyncNode);
// trace log
do {
int32_t debugStrLen = strlen(debugStr);
if (debugStrLen < 256) {
char eventLog[256 + 64];
snprintf(eventLog, sizeof(eventLog), "become follower %s", debugStr);
syncNodeEventLog(pSyncNode, eventLog);
} else {
char* eventLog = taosMemoryMalloc(debugStrLen + 64);
snprintf(eventLog, debugStrLen, "become follower %s", debugStr);
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
}
} while (0);
}
// TLA+ Spec
......@@ -1566,13 +1537,6 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// reset restoreFinish
pSyncNode->restoreFinish = false;
sDebug(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
pSyncNode->restoreFinish, debugStr);
// state change
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
......@@ -1618,6 +1582,21 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// start heartbeat timer
syncNodeStartHeartbeatTimer(pSyncNode);
// trace log
do {
int32_t debugStrLen = strlen(debugStr);
if (debugStrLen < 256) {
char eventLog[256 + 64];
snprintf(eventLog, sizeof(eventLog), "become leader %s", debugStr);
syncNodeEventLog(pSyncNode, eventLog);
} else {
char* eventLog = taosMemoryMalloc(debugStrLen + 64);
snprintf(eventLog, debugStrLen, "become leader %s", debugStr);
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
}
} while (0);
}
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
......@@ -2147,30 +2126,26 @@ const char* syncStr(ESyncState state) {
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer", ths->vgId,
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm);
syncNodeEventLog(ths, "begin leader transfer");
if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) {
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId,
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort,
pSyncLeaderTransfer->newLeaderId.addr);
bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort;
bool same = sameId || sameNodeInfo;
if (same) {
// reset elect timer now!
int32_t electMS = 1;
int32_t ret = syncNodeRestartElectTimer(ths, electMS);
ASSERT(ret == 0);
char eventLog[256];
snprintf(eventLog, sizeof(eventLog), "maybe leader transfer to %s:%d %lu",
pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort,
pSyncLeaderTransfer->newLeaderId.addr);
syncNodeEventLog(ths, eventLog);
}
/*
if (syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId))) {
// reset elect timer now!
int32_t electMS = 1;
int32_t ret = syncNodeRestartElectTimer(ths, electMS);
ASSERT(ret == 0);
}
*/
if (ths->pFsm->FpLeaderTransferCb != NULL) {
SFsmCbMeta cbMeta = {0};
cbMeta.code = 0;
......@@ -2235,12 +2210,10 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
// change isStandBy to normal
if (!isDrop) {
char tmpbuf[512];
char* oldStr = syncCfg2Str(&oldSyncCfg);
char* newStr = syncCfg2Str(&newSyncCfg);
syncUtilJson2Line(oldStr);
syncUtilJson2Line(newStr);
char* oldStr = syncCfg2SimpleStr(&oldSyncCfg);
char* newStr = syncCfg2SimpleStr(&newSyncCfg);
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum,
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
......@@ -2252,12 +2225,10 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
}
} else {
char tmpbuf[512];
char* oldStr = syncCfg2Str(&oldSyncCfg);
char* newStr = syncCfg2Str(&newSyncCfg);
syncUtilJson2Line(oldStr);
syncUtilJson2Line(newStr);
char* oldStr = syncCfg2SimpleStr(&oldSyncCfg);
char* newStr = syncCfg2SimpleStr(&newSyncCfg);
snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum,
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
......@@ -2295,10 +2266,10 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
int32_t code = 0;
ESyncState state = flag;
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64
", %s",
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, beginIndex,
endIndex, syncUtilState2String(state));
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "commit by wal from index:%ld to index:%ld", beginIndex, endIndex);
syncNodeEventLog(ths, eventLog);
// execute fsm
if (ths->pFsm != NULL) {
......@@ -2350,9 +2321,10 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ths->pFsm->FpRestoreFinishCb(ths->pFsm);
}
ths->restoreFinish = true;
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld", ths->vgId,
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
syncUtilState2String(ths->state), pEntry->index);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "restore finish, index:%ld", pEntry->index);
syncNodeEventLog(ths, eventLog);
}
}
......
......@@ -101,6 +101,29 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
return serialized;
}
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
int32_t len = 512;
char *s = taosMemoryMalloc(len);
memset(s, 0, len);
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
char *p = s + strlen(s);
for (int i = 0; i < pSyncCfg->replicaNum; ++i) {
/*
if (p + 128 + 32 > s + len) {
break;
}
*/
char buf[128 + 32];
snprintf(buf, sizeof(buf), "%s:%d, ", pSyncCfg->nodeInfo[i].nodeFqdn, pSyncCfg->nodeInfo[i].nodePort);
strncpy(p, buf, sizeof(buf));
p = s + strlen(s);
}
strcpy(p - 2, "}");
return s;
}
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) {
memset(pSyncCfg, 0, sizeof(SSyncCfg));
// cJSON *pJson = cJSON_GetObjectItem(pRoot, "SSyncCfg");
......@@ -284,6 +307,12 @@ void syncCfgLog2(char *s, SSyncCfg *pCfg) {
taosMemoryFree(serialized);
}
void syncCfgLog3(char *s, SSyncCfg *pCfg) {
char *serialized = syncCfg2SimpleStr(pCfg);
sTrace("syncCfgLog3 | len:%lu | %s | %s", strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
void raftCfgPrint(SRaftCfg *pCfg) {
char *serialized = raftCfg2Str(pCfg);
printf("raftCfgPrint | len:%lu | %s \n", strlen(serialized), serialized);
......
......@@ -163,12 +163,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
walFsync(pWal, true);
sDebug(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, "
"originalRpcType:%s,%d",
pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex,
pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy,
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "write index:%ld, type:%s,%d, type2:%s,%d", pEntry->index,
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
syncNodeEventLog(pData->pSyncNode, eventLog);
return code;
}
......@@ -320,16 +318,13 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
linuxErrMsg);
ASSERT(0);
}
// assert(code == 0);
walFsync(pWal, true);
sDebug(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, "
"originalRpcType:%s,%d",
pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex,
pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy,
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "old write index:%ld, type:%s,%d, type2:%s,%d", pEntry->index,
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
syncNodeEventLog(pData->pSyncNode, eventLog);
return code;
}
......
......@@ -46,11 +46,11 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data;
sDebug(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode,
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "resp mgr add, type:%s,%d, seq:%lu, handle:%p, ahandle:%p",
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pStub->rpcMsg.info.handle,
pStub->rpcMsg.info.ahandle);
syncNodeEventLog(pSyncNode, eventLog);
taosThreadMutexUnlock(&(pObj->mutex));
return keyCode;
......@@ -73,12 +73,11 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
memcpy(pStub, pTmp, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data;
sDebug(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p "
"ahandle:%p",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index,
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "resp mgr get, type:%s,%d, seq:%lu, handle:%p, ahandle:%p",
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle,
pStub->rpcMsg.info.ahandle);
syncNodeEventLog(pSyncNode, eventLog);
taosThreadMutexUnlock(&(pObj->mutex));
return 1; // get one object
......@@ -95,12 +94,11 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
memcpy(pStub, pTmp, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data;
sDebug(
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p "
"ahandle:%p",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index,
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "resp mgr get-and-del, type:%s,%d, seq:%lu, handle:%p, ahandle:%p",
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle,
pStub->rpcMsg.info.ahandle);
syncNodeEventLog(pSyncNode, eventLog);
taosHashRemove(pObj->pRespHash, &index, sizeof(index));
taosThreadMutexUnlock(&(pObj->mutex));
......
此差异已折叠。
......@@ -55,6 +55,8 @@ SSyncCfg* createSyncCfg() {
void test1() {
SSyncCfg* pCfg = createSyncCfg();
syncCfgLog2((char*)__FUNCTION__, pCfg);
syncCfgLog3((char*)__FUNCTION__, pCfg);
taosMemoryFree(pCfg);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册