未验证 提交 a4e6d841 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18128 from taosdata/fix/sync-error-log

fix(sync): do not use sNError in timer function
...@@ -1806,7 +1806,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { ...@@ -1806,7 +1806,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock), int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
pNode->pingTimerMS, pNode); pNode->pingTimerMS, pNode);
if (code != 0) { if (code != 0) {
sNError(pNode, "failed to build ping msg"); sError("failed to build ping msg");
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
return; return;
} }
...@@ -1814,7 +1814,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { ...@@ -1814,7 +1814,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
sNTrace(pNode, "enqueue ping msg"); sNTrace(pNode, "enqueue ping msg");
code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sNError(pNode, "failed to sync enqueue ping msg since %s", terrstr()); sError("failed to sync enqueue ping msg since %s", terrstr());
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
return; return;
} }
...@@ -1839,7 +1839,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { ...@@ -1839,7 +1839,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode); int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode);
if (code != 0) { if (code != 0) {
sNError(pNode, "failed to build elect msg"); sError("failed to build elect msg");
taosMemoryFree(pElectTimer); taosMemoryFree(pElectTimer);
return; return;
} }
...@@ -1849,7 +1849,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { ...@@ -1849,7 +1849,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sNError(pNode, "failed to sync enqueue elect msg since %s", terrstr()); sError("failed to sync enqueue elect msg since %s", terrstr());
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
taosMemoryFree(pElectTimer); taosMemoryFree(pElectTimer);
return; return;
...@@ -1879,14 +1879,14 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { ...@@ -1879,14 +1879,14 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
pNode->heartbeatTimerMS, pNode); pNode->heartbeatTimerMS, pNode);
if (code != 0) { if (code != 0) {
sNError(pNode, "failed to build heartbeat msg"); sError("failed to build heartbeat msg");
return; return;
} }
sNTrace(pNode, "enqueue heartbeat timer"); sNTrace(pNode, "enqueue heartbeat timer");
code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sNError(pNode, "failed to enqueue heartbeat msg since %s", terrstr()); sError("failed to enqueue heartbeat msg since %s", terrstr());
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
return; return;
} }
...@@ -1971,7 +1971,7 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) { ...@@ -1971,7 +1971,7 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) {
sNTrace(pNode, "propose msg, type:noop"); sNTrace(pNode, "propose msg, type:noop");
code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg); code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sNError(pNode, "failed to propose noop msg while enqueue since %s", terrstr()); sError("failed to propose noop msg while enqueue since %s", terrstr());
} }
return code; return code;
...@@ -2005,7 +2005,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { ...@@ -2005,7 +2005,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
if (ths->state == TAOS_SYNC_STATE_LEADER) { if (ths->state == TAOS_SYNC_STATE_LEADER) {
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
if (code != 0) { if (code != 0) {
sNError(ths, "append noop error"); sError("append noop error");
return -1; return -1;
} }
} }
...@@ -2109,7 +2109,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2109,7 +2109,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncNodeFollowerCommit(ths, pMsg->fcIndex); syncNodeFollowerCommit(ths, pMsg->fcIndex);
} else { } else {
sNError(ths, "error local cmd"); sError("error local cmd");
} }
return 0; return 0;
......
...@@ -198,9 +198,9 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -198,9 +198,9 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
syncMeta.seqNum = pEntry->seqNum; syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term; syncMeta.term = pEntry->term;
int64_t tsWriteBegin = taosGetTimestampMs(); int64_t tsWriteBegin = taosGetTimestampNs();
index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
int64_t tsWriteEnd = taosGetTimestampMs(); int64_t tsWriteEnd = taosGetTimestampNs();
int64_t tsElapsed = tsWriteEnd - tsWriteBegin; int64_t tsElapsed = tsWriteEnd - tsWriteBegin;
if (index < 0) { if (index < 0) {
...@@ -239,12 +239,12 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR ...@@ -239,12 +239,12 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
return -1; return -1;
} }
int64_t ts1 = taosGetTimestampNs();
taosThreadMutexLock(&(pData->mutex)); taosThreadMutexLock(&(pData->mutex));
int64_t tsBegin = taosGetTimestampMs(); int64_t ts2 = taosGetTimestampNs();
code = walReadVer(pWalHandle, index); code = walReadVer(pWalHandle, index);
int64_t tsEnd = taosGetTimestampMs(); int64_t ts3 = taosGetTimestampNs();
int64_t tsElapsed = tsEnd - tsBegin;
// code = walReadVerCached(pWalHandle, index); // code = walReadVerCached(pWalHandle, index);
if (code != 0) { if (code != 0) {
...@@ -289,6 +289,18 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR ...@@ -289,6 +289,18 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
*/ */
taosThreadMutexUnlock(&(pData->mutex)); taosThreadMutexUnlock(&(pData->mutex));
int64_t ts4 = taosGetTimestampNs();
int64_t tsElapsed = ts4 - ts1;
int64_t tsElapsedLock = ts2 - ts1;
int64_t tsElapsedRead = ts3 - ts2;
int64_t tsElapsedBuild = ts4 - ts3;
sNTrace(pData->pSyncNode,
"read index:%" PRId64 ", elapsed:%" PRId64 ", elapsed-lock:%" PRId64 ", elapsed-read:%" PRId64
", elapsed-build:%" PRId64,
index, tsElapsed, tsElapsedLock, tsElapsedRead, tsElapsedBuild);
return code; return code;
} }
......
...@@ -72,7 +72,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh ...@@ -72,7 +72,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
SyncAppendEntries* pMsg = NULL; SyncAppendEntries* pMsg = NULL;
SSyncRaftEntry* pEntry; SSyncRaftEntry* pEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry); int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry);
if (code == 0) { if (code == 0) {
...@@ -99,6 +99,8 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh ...@@ -99,6 +99,8 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
} }
} }
syncEntryDestory(pEntry);
// prepare msg // prepare msg
ASSERT(pMsg != NULL); ASSERT(pMsg != NULL);
pMsg->srcId = pSyncNode->myRaftId; pMsg->srcId = pSyncNode->myRaftId;
......
...@@ -194,6 +194,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { ...@@ -194,6 +194,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) { void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
int64_t currentTerm = pNode->pRaftStore->currentTerm;
// save error code, otherwise it will be overwritten // save error code, otherwise it will be overwritten
int32_t errCode = terrno; int32_t errCode = terrno;
...@@ -235,8 +236,8 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo ...@@ -235,8 +236,8 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64 ", sby:%d, aq:%d, bch:%d, r-num:%d, lcfg:%" PRId64 ", snap-tm:%" PRIu64 ", sby:%d, aq:%d, bch:%d, r-num:%d, lcfg:%" PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
pNode->vgId, syncStr(pNode->state), eventLog, pNode->pRaftStore->currentTerm, pNode->commitIndex, pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex,
logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum, pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum,
pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum,
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
...@@ -374,9 +375,9 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries ...@@ -374,9 +375,9 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, sNTrace(pSyncNode,
"recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64 "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64
"}, %s", ", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
} }
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
...@@ -511,8 +512,8 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs ...@@ -511,8 +512,8 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, sNTrace(pSyncNode,
"send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 "send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s", ", lsend-index:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, (pMsg->prevLogIndex + 1), pMsg->commitIndex,
pMsg->dataLen, s); pMsg->dataLen, s);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册