未验证 提交 7bdafa5f 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18199 from taosdata/fix/TD-20439

fix(sync): fix timer memory leak
...@@ -73,11 +73,12 @@ typedef struct SSyncTimer { ...@@ -73,11 +73,12 @@ typedef struct SSyncTimer {
SSyncHbTimerData hbData; SSyncHbTimerData hbData;
} SSyncTimer; } SSyncTimer;
typedef struct SElectTimer { typedef struct SElectTimerParam {
uint64_t logicClock; uint64_t logicClock;
SSyncNode* pSyncNode; SSyncNode* pSyncNode;
int64_t executeTime;
void* pData; void* pData;
} SElectTimer; } SElectTimerParam;
typedef struct SPeerState { typedef struct SPeerState {
SyncIndex lastSendIndex; SyncIndex lastSendIndex;
...@@ -153,6 +154,7 @@ typedef struct SSyncNode { ...@@ -153,6 +154,7 @@ typedef struct SSyncNode {
uint64_t electTimerLogicClock; uint64_t electTimerLogicClock;
TAOS_TMR_CALLBACK FpElectTimerCB; // Timer Fp TAOS_TMR_CALLBACK FpElectTimerCB; // Timer Fp
uint64_t electTimerCounter; uint64_t electTimerCounter;
SElectTimerParam electTimerParam;
// heartbeat timer // heartbeat timer
tmr_h pHeartbeatTimer; tmr_h pHeartbeatTimer;
......
...@@ -1104,8 +1104,16 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { ...@@ -1104,8 +1104,16 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
int32_t ret = 0; int32_t ret = 0;
if (syncIsInit()) { if (syncIsInit()) {
pSyncNode->electTimerMS = ms; pSyncNode->electTimerMS = ms;
int64_t execTime = taosGetTimestampMs() + ms;
atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
pSyncNode->electTimerParam.pSyncNode = pSyncNode;
pSyncNode->electTimerParam.pData = NULL;
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager, taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager,
&pSyncNode->pElectTimer); &pSyncNode->pElectTimer);
} else { } else {
sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId); sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
} }
...@@ -1855,27 +1863,35 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { ...@@ -1855,27 +1863,35 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
} }
static void syncNodeEqElectTimer(void* param, void* tmrId) { static void syncNodeEqElectTimer(void* param, void* tmrId) {
SSyncNode* pNode = param;
if (!syncIsInit()) return; if (!syncIsInit()) return;
SSyncNode* pNode = (SSyncNode*)param;
if (pNode == NULL) return; if (pNode == NULL) return;
if (pNode->syncEqMsg == NULL) return; if (pNode->syncEqMsg == NULL) return;
int64_t tsNow = taosGetTimestampMs();
if (tsNow < pNode->electTimerParam.executeTime) return;
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
int32_t code = int32_t code =
syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerLogicClock, pNode->electTimerMS, pNode); syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
if (code != 0) { if (code != 0) {
sError("failed to build elect msg"); sError("failed to build elect msg");
return; return;
} }
SyncTimeout* pTimeout = rpcMsg.pCont; SyncTimeout* pTimeout = rpcMsg.pCont;
sTrace("enqueue elect msg lc:%" PRId64, pTimeout->logicClock); sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock);
code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("failed to sync enqueue elect msg since %s", terrstr()); sError("failed to sync enqueue elect msg since %s", terrstr());
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
return;
} }
} }
......
...@@ -193,7 +193,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { ...@@ -193,7 +193,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
} }
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) { void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; if (pNode == NULL || pNode->pRaftCfg == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
int64_t currentTerm = pNode->pRaftStore->currentTerm; int64_t currentTerm = pNode->pRaftStore->currentTerm;
// save error code, otherwise it will be overwritten // save error code, otherwise it will be overwritten
...@@ -252,7 +252,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo ...@@ -252,7 +252,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender, void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
const char* format, ...) { const char* format, ...) {
SSyncNode* pNode = pSender->pSyncNode; SSyncNode* pNode = pSender->pSyncNode;
if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; if (pNode == NULL || pNode->pRaftCfg == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) { if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
...@@ -304,7 +304,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla ...@@ -304,7 +304,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
const char* format, ...) { const char* format, ...) {
SSyncNode* pNode = pReceiver->pSyncNode; SSyncNode* pNode = pReceiver->pSyncNode;
if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; if (pNode == NULL || pNode->pRaftCfg == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) { if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
...@@ -554,4 +554,4 @@ void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl ...@@ -554,4 +554,4 @@ void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term, sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s); pMsg->voteGranted, s);
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册