提交 effc38d2 编写于 作者: M Minghao Li

fix(sync): fix asan error, use rid in hbdata and syncnode

上级 6208aeb1
...@@ -57,6 +57,11 @@ void syncNodeRemove(int64_t rid); ...@@ -57,6 +57,11 @@ void syncNodeRemove(int64_t rid);
SSyncNode* syncNodeAcquire(int64_t rid); SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode); void syncNodeRelease(SSyncNode* pNode);
int64_t syncHbTimerDataAdd(SSyncHbTimerData* pData);
void syncHbTimerDataRemove(int64_t rid);
SSyncHbTimerData* syncHbTimerDataAcquire(int64_t rid);
void syncHbTimerDataRelease(SSyncHbTimerData* pData);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -57,10 +57,11 @@ typedef struct SRaftId { ...@@ -57,10 +57,11 @@ typedef struct SRaftId {
} SRaftId; } SRaftId;
typedef struct SSyncHbTimerData { typedef struct SSyncHbTimerData {
SSyncNode* pSyncNode; int64_t syncNodeRid;
SSyncTimer* pTimer; SSyncTimer* pTimer;
SRaftId destId; SRaftId destId;
uint64_t logicClock; uint64_t logicClock;
int64_t rid;
} SSyncHbTimerData; } SSyncHbTimerData;
typedef struct SSyncTimer { typedef struct SSyncTimer {
...@@ -70,7 +71,7 @@ typedef struct SSyncTimer { ...@@ -70,7 +71,7 @@ typedef struct SSyncTimer {
uint64_t counter; uint64_t counter;
int32_t timerMS; int32_t timerMS;
SRaftId destId; SRaftId destId;
SSyncHbTimerData hbData; int64_t hbDataRid;
} SSyncTimer; } SSyncTimer;
typedef struct SElectTimerParam { typedef struct SElectTimerParam {
...@@ -189,6 +190,8 @@ typedef struct SSyncNode { ...@@ -189,6 +190,8 @@ typedef struct SSyncNode {
int64_t leaderTime; int64_t leaderTime;
int64_t lastReplicateTime; int64_t lastReplicateTime;
bool isStart;
} SSyncNode; } SSyncNode;
// open/close -------------- // open/close --------------
...@@ -198,6 +201,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode); ...@@ -198,6 +201,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode);
void syncNodeClose(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode);
void syncNodePreClose(SSyncNode* pSyncNode); void syncNodePreClose(SSyncNode* pSyncNode);
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
void syncHbTimerDataFree(SSyncHbTimerData* pData);
// on message --------------------- // on message ---------------------
int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pMsg);
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
static SSyncEnv gSyncEnv = {0}; static SSyncEnv gSyncEnv = {0};
static int32_t gNodeRefId = -1; static int32_t gNodeRefId = -1;
static int32_t gHbDataRefId = -1;
static void syncEnvTick(void *param, void *tmrId); static void syncEnvTick(void *param, void *tmrId);
SSyncEnv *syncEnv() { return &gSyncEnv; } SSyncEnv *syncEnv() { return &gSyncEnv; }
...@@ -50,6 +51,13 @@ int32_t syncInit() { ...@@ -50,6 +51,13 @@ int32_t syncInit() {
return -1; return -1;
} }
gHbDataRefId = taosOpenRef(200, (RefFp)syncHbTimerDataFree);
if (gHbDataRefId < 0) {
sError("failed to init hb-data ref");
syncCleanUp();
return -1;
}
sDebug("sync rsetId:%d is open", gNodeRefId); sDebug("sync rsetId:%d is open", gNodeRefId);
return 0; return 0;
} }
...@@ -64,6 +72,12 @@ void syncCleanUp() { ...@@ -64,6 +72,12 @@ void syncCleanUp() {
taosCloseRef(gNodeRefId); taosCloseRef(gNodeRefId);
gNodeRefId = -1; gNodeRefId = -1;
} }
if (gHbDataRefId != -1) {
sDebug("sync rsetId:%d is closed", gHbDataRefId);
taosCloseRef(gHbDataRefId);
gHbDataRefId = -1;
}
} }
int64_t syncNodeAdd(SSyncNode *pNode) { int64_t syncNodeAdd(SSyncNode *pNode) {
...@@ -88,6 +102,26 @@ SSyncNode *syncNodeAcquire(int64_t rid) { ...@@ -88,6 +102,26 @@ SSyncNode *syncNodeAcquire(int64_t rid) {
void syncNodeRelease(SSyncNode *pNode) { taosReleaseRef(gNodeRefId, pNode->rid); } void syncNodeRelease(SSyncNode *pNode) { taosReleaseRef(gNodeRefId, pNode->rid); }
int64_t syncHbTimerDataAdd(SSyncHbTimerData *pData) {
pData->rid = taosAddRef(gHbDataRefId, pData);
if (pData->rid < 0) return -1;
return pData->rid;
}
void syncHbTimerDataRemove(int64_t rid) { taosRemoveRef(gHbDataRefId, rid); }
SSyncHbTimerData *syncHbTimerDataAcquire(int64_t rid) {
SSyncHbTimerData *pData = taosAcquireRef(gHbDataRefId, rid);
if (pData == NULL) {
sError("failed to acquire hb-timer-data from refId:%" PRId64, rid);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
}
return pData;
}
void syncHbTimerDataRelease(SSyncHbTimerData *pData) { taosReleaseRef(gHbDataRefId, pData->rid); }
#if 0 #if 0
void syncEnvStartTimer() { void syncEnvStartTimer() {
taosTmrReset(gSyncEnv.FpEnvTickTimer, gSyncEnv.envTickTimerMS, &gSyncEnv, gSyncEnv.pTimerManager, taosTmrReset(gSyncEnv.FpEnvTickTimer, gSyncEnv.envTickTimerMS, &gSyncEnv, gSyncEnv.pTimerManager,
......
...@@ -91,6 +91,7 @@ void syncStart(int64_t rid) { ...@@ -91,6 +91,7 @@ void syncStart(int64_t rid) {
void syncStop(int64_t rid) { void syncStop(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode != NULL) { if (pSyncNode != NULL) {
pSyncNode->isStart = false;
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
syncNodeRemove(rid); syncNodeRemove(rid);
} }
...@@ -665,13 +666,20 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa ...@@ -665,13 +666,20 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
int32_t ret = 0; int32_t ret = 0;
if (syncIsInit()) { if (syncIsInit()) {
SSyncHbTimerData* pData = &pSyncTimer->hbData; SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
pData->pSyncNode = pSyncNode; if (pData == NULL) {
pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
pData->rid = syncHbTimerDataAdd(pData);
}
pSyncTimer->hbDataRid = pData->rid;
pData->syncNodeRid = pSyncNode->rid;
pData->pTimer = pSyncTimer; pData->pTimer = pSyncTimer;
pData->destId = pSyncTimer->destId; pData->destId = pSyncTimer->destId;
pData->logicClock = pSyncTimer->logicClock; pData->logicClock = pSyncTimer->logicClock;
taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer); taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid), syncEnv()->pTimerManager,
&pSyncTimer->pTimer);
} else { } else {
sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId); sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
} }
...@@ -683,6 +691,8 @@ static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { ...@@ -683,6 +691,8 @@ static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
atomic_add_fetch_64(&pSyncTimer->logicClock, 1); atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
taosTmrStop(pSyncTimer->pTimer); taosTmrStop(pSyncTimer->pTimer);
pSyncTimer->pTimer = NULL; pSyncTimer->pTimer = NULL;
syncHbTimerDataRemove(pSyncTimer->hbDataRid);
pSyncTimer->hbDataRid = -1;
return ret; return ret;
} }
...@@ -960,6 +970,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { ...@@ -960,6 +970,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// snapshotting // snapshotting
atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID); atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
pSyncNode->isStart = true;
sNTrace(pSyncNode, "sync open"); sNTrace(pSyncNode, "sync open");
return pSyncNode; return pSyncNode;
...@@ -1027,6 +1038,8 @@ void syncNodePreClose(SSyncNode* pSyncNode) { ...@@ -1027,6 +1038,8 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
syncNodeStopHeartbeatTimer(pSyncNode); syncNodeStopHeartbeatTimer(pSyncNode);
} }
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
void syncNodeClose(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) {
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return; return;
...@@ -1929,19 +1942,36 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { ...@@ -1929,19 +1942,36 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
} }
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
SSyncHbTimerData* pData = (SSyncHbTimerData*)param; int64_t hbDataRid = (int64_t)param;
SSyncNode* pSyncNode = pData->pSyncNode;
SSyncTimer* pSyncTimer = pData->pTimer;
SSyncHbTimerData* pData = syncHbTimerDataAcquire(hbDataRid);
if (pData == NULL) {
return;
}
SSyncNode* pSyncNode = syncNodeAcquire(pData->syncNodeRid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
syncHbTimerDataRelease(pData);
return;
}
SSyncTimer* pSyncTimer = pData->pTimer;
if (!pSyncNode->isStart) {
syncNodeRelease(pSyncNode);
syncHbTimerDataRelease(pData);
return; return;
} }
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
syncNodeRelease(pSyncNode);
syncHbTimerDataRelease(pData);
return; return;
} }
if (pSyncNode->pRaftStore == NULL) { if (pSyncNode->pRaftStore == NULL) {
syncNodeRelease(pSyncNode);
syncHbTimerDataRelease(pData);
return; return;
} }
...@@ -1978,6 +2008,9 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { ...@@ -1978,6 +2008,9 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
msgLogicClock); msgLogicClock);
} }
} }
syncHbTimerDataRelease(pData);
syncNodeRelease(pSyncNode);
} }
static int32_t syncNodeEqNoop(SSyncNode* pNode) { static int32_t syncNodeEqNoop(SSyncNode* pNode) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册