diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 95ee2ca2bc8fa6907ea3b03679ad4d9ef57142ab..1a94dcf42668edea01f86ad55ae9ab0fb27d5241 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -200,10 +200,10 @@ typedef struct SSyncInfo { int32_t syncInit(); void syncCleanUp(); +bool syncIsInit(); int64_t syncOpen(SSyncInfo* pSyncInfo); void syncStart(int64_t rid); void syncStop(int64_t rid); -int32_t syncSetStandby(int64_t rid); ESyncState syncGetMyRole(int64_t rid); bool syncIsReady(int64_t rid); const char* syncGetMyRoleStr(int64_t rid); @@ -216,7 +216,6 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); // int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize); -bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot); @@ -234,6 +233,9 @@ int32_t syncEndSnapshot(int64_t rid); int32_t syncStepDown(int64_t rid, SyncTerm newTerm); +SSyncNode* syncNodeAcquire(int64_t rid); +void syncNodeRelease(SSyncNode* pNode); + #ifdef __cplusplus } #endif diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index a1cff2b73832bc1aa6a00f920e4e30a0f7835060..9cb8a9d564b564526e6cb764bba4f083460cb3da 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -28,10 +28,6 @@ typedef struct SRaftId { SyncGroupId vgId; } SRaftId; -// ------------------ control ------------------- -SSyncNode* syncNodeAcquire(int64_t rid); -void syncNodeRelease(SSyncNode* pNode); - int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo); void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb); diff --git a/include/util/tref.h b/include/util/tref.h index c2cc54cb07ac3167941061b475f8811e460a3b91..c4b2ec8fa72b1ec58456f76a20f5ac7757e014f7 100644 --- a/include/util/tref.h +++ b/include/util/tref.h @@ -25,7 +25,8 @@ extern "C" { // open a reference set, max is the mod used by hash, fp is the pointer to free resource function // return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately -int32_t taosOpenRef(int32_t max, void (*fp)(void *)); +typedef void (*RefFp)(void *); +int32_t taosOpenRef(int32_t max, RefFp fp); // close the reference set, refId is the return value by taosOpenRef // return 0 if success. On error, -1 is returned, and terrno is set appropriately diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 98a24286f61711e9d9835dff0cb3c5f969ca79eb..53e7b1cd26c385eb1d4e9303cf6db98a940dbc3f 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -481,7 +481,7 @@ int32_t mndProcessSyncCtrlMsg(SRpcMsg *pMsg) { mInfo("vgId:%d, process sync ctrl msg", 1); - if (!syncEnvIsStart()) { + if (!syncIsInit()) { mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -518,7 +518,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; int32_t code = 0; - if (!syncEnvIsStart()) { + if (!syncIsInit()) { mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -581,11 +581,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); syncSnapshotRspDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) { - code = syncSetStandby(pMgmt->sync); - SRpcMsg rsp = {.code = code, .info = pMsg->info}; - tmsgSendRsp(&rsp); - } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) { SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg); code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index bf665fd6db32f7ebd2a82b97bc7a556b41c05bb2..339be16c91064872455b303bd555deb5b6ef0414 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -234,7 +234,7 @@ int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t code = 0; const STraceId *trace = &pMsg->info.traceId; - if (!syncEnvIsStart()) { + if (!syncIsInit()) { vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg); terrno = TSDB_CODE_APP_ERROR; return -1; @@ -277,7 +277,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t code = 0; const STraceId *trace = &pMsg->info.traceId; - if (!syncEnvIsStart()) { + if (!syncIsInit()) { vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg); terrno = TSDB_CODE_APP_ERROR; return -1; diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 06da0eb3dfe8066ae52372763e765530a393fc66..068ccaf02906f7dee16e869542c803d65b4c99a0 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -20,13 +20,7 @@ extern "C" { #endif -#include -#include -#include #include "syncInt.h" -#include "taosdef.h" -#include "trpc.h" -#include "ttimer.h" #define TIMER_MAX_MS 0x7FFFFFFF #define ENV_TICK_TIMER_MS 1000 @@ -57,12 +51,12 @@ typedef struct SSyncEnv { } SSyncEnv; -extern SSyncEnv* gSyncEnv; +SSyncEnv* syncEnv(); -int32_t syncEnvStart(); -int32_t syncEnvStop(); -int32_t syncEnvStartTimer(); -int32_t syncEnvStopTimer(); +int64_t syncNodeAdd(SSyncNode* pNode); +void syncNodeRemove(SSyncNode* pNode); +SSyncNode* syncNodeAcquire(int64_t rid); +void syncNodeRelease(SSyncNode* pNode); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index f4949e1016b18336faa302b065e04aab20689185..843dee134272350ff0c83297d0484390e536de2d 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -24,6 +24,8 @@ extern "C" { #include "syncTools.h" #include "tlog.h" #include "ttimer.h" +#include "taosdef.h" +#include "ttimer.h" // clang-format off #define sFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) @@ -255,9 +257,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, Sync SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode); char* syncNodePeerState2Str(const SSyncNode* pSyncNode); -SSyncNode* syncNodeAcquire(int64_t rid); -void syncNodeRelease(SSyncNode* pNode); - // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term); @@ -302,9 +301,6 @@ bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); -void syncStartNormal(int64_t rid); -void syncStartStandBy(int64_t rid); - bool syncNodeCanChange(SSyncNode* pSyncNode); bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg); diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index fb0bfb8bef318c2f8d2d53f12cfabcfa4fb30ef2..4d4cb8ab69e2b046aaf5db78b6c2d04daf1de10c 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -13,118 +13,114 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "syncEnv.h" -// #include +#include "tref.h" -SSyncEnv *gSyncEnv = NULL; +static SSyncEnv gSyncEnv = {0}; +static int32_t gNodeRefId = -1; +bool gRaftDetailLog = false; +static void syncEnvTick(void *param, void *tmrId); -// local function ----------------- -static SSyncEnv *doSyncEnvStart(); -static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv); -static int32_t doSyncEnvStartTimer(SSyncEnv *pSyncEnv); -static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv); -static void syncEnvTick(void *param, void *tmrId); -// -------------------------------- +SSyncEnv *syncEnv() { return &gSyncEnv; } -bool syncEnvIsStart() { - if (gSyncEnv == NULL) { - return false; - } +bool syncIsInit() { return atomic_load_8(&gSyncEnv.isStart); } - return atomic_load_8(&(gSyncEnv->isStart)); -} +int32_t syncInit() { + if (syncIsInit()) return 0; -int32_t syncEnvStart() { - int32_t ret = 0; uint32_t seed = (uint32_t)(taosGetTimestampNs() & 0x00000000FFFFFFFF); taosSeedRand(seed); - // gSyncEnv = doSyncEnvStart(gSyncEnv); - gSyncEnv = doSyncEnvStart(); - ASSERT(gSyncEnv != NULL); - sTrace("sync env start ok"); - return ret; -} -int32_t syncEnvStop() { - int32_t ret = doSyncEnvStop(gSyncEnv); - return ret; -} + memset(&gSyncEnv, 0, sizeof(SSyncEnv)); + gSyncEnv.envTickTimerCounter = 0; + gSyncEnv.envTickTimerMS = ENV_TICK_TIMER_MS; + gSyncEnv.FpEnvTickTimer = syncEnvTick; + atomic_store_64(&gSyncEnv.envTickTimerLogicClock, 0); + atomic_store_64(&gSyncEnv.envTickTimerLogicClockUser, 0); -int32_t syncEnvStartTimer() { - int32_t ret = doSyncEnvStartTimer(gSyncEnv); - return ret; -} + // start tmr thread + gSyncEnv.pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV"); + atomic_store_8(&gSyncEnv.isStart, 1); + + gNodeRefId = taosOpenRef(200, (RefFp)syncNodeClose); + if (gNodeRefId < 0) { + sError("failed to init node ref"); + syncCleanUp(); + return -1; + } -int32_t syncEnvStopTimer() { - int32_t ret = doSyncEnvStopTimer(gSyncEnv); - return ret; + sDebug("sync rsetId:%d is open", gNodeRefId); + return 0; } -// local function ----------------- -static void syncEnvTick(void *param, void *tmrId) { - SSyncEnv *pSyncEnv = (SSyncEnv *)param; - if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) { - ++(pSyncEnv->envTickTimerCounter); - sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 - ", envTickTimerCounter:%" PRIu64 - ", " - "envTickTimerMS:%d, tmrId:%p", - pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, - pSyncEnv->envTickTimerMS, tmrId); +void syncCleanUp() { + atomic_store_8(&gSyncEnv.isStart, 0); + taosTmrCleanUp(gSyncEnv.pTimerManager); + memset(&gSyncEnv, 0, sizeof(SSyncEnv)); - // do something, tick ... - taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer); - } else { - sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 - ", envTickTimerCounter:%" PRIu64 - ", " - "envTickTimerMS:%d, tmrId:%p", - pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, - pSyncEnv->envTickTimerMS, tmrId); + if (gNodeRefId != -1) { + sDebug("sync rsetId:%d is closed", gNodeRefId); + taosCloseRef(gNodeRefId); + gNodeRefId = -1; } } -static SSyncEnv *doSyncEnvStart() { - SSyncEnv *pSyncEnv = (SSyncEnv *)taosMemoryMalloc(sizeof(SSyncEnv)); - ASSERT(pSyncEnv != NULL); - memset(pSyncEnv, 0, sizeof(SSyncEnv)); - - pSyncEnv->envTickTimerCounter = 0; - pSyncEnv->envTickTimerMS = ENV_TICK_TIMER_MS; - pSyncEnv->FpEnvTickTimer = syncEnvTick; - atomic_store_64(&pSyncEnv->envTickTimerLogicClock, 0); - atomic_store_64(&pSyncEnv->envTickTimerLogicClockUser, 0); +int64_t syncNodeAdd(SSyncNode *pNode) { + pNode->rid = taosAddRef(gNodeRefId, pNode); + if (pNode->rid < 0) return -1; - // start tmr thread - pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV"); + sDebug("vgId:%d, sync rid:%" PRId64 " is added to rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId); + return 0; +} - atomic_store_8(&(pSyncEnv->isStart), 1); - return pSyncEnv; +void syncNodeRemove(SSyncNode *pNode) { + taosRemoveRef(gNodeRefId, pNode->rid); + sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId); } -static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { - ASSERT(pSyncEnv == gSyncEnv); - if (pSyncEnv != NULL) { - atomic_store_8(&(pSyncEnv->isStart), 0); - taosTmrCleanUp(pSyncEnv->pTimerManager); - taosMemoryFree(pSyncEnv); +SSyncNode *syncNodeAcquire(int64_t rid) { + SSyncNode *pNode = taosAcquireRef(gNodeRefId, rid); + if (pNode == NULL) { + sTrace("failed to acquire node from refId:%" PRId64, rid); } - gSyncEnv = NULL; - return 0; + + return pNode; } -static int32_t doSyncEnvStartTimer(SSyncEnv *pSyncEnv) { - int32_t ret = 0; - taosTmrReset(pSyncEnv->FpEnvTickTimer, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, - &pSyncEnv->pEnvTickTimer); - atomic_store_64(&pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerLogicClockUser); - return ret; +void syncNodeRelease(SSyncNode *pNode) { taosReleaseRef(gNodeRefId, pNode->rid); } + +#if 0 +void syncEnvStartTimer() { + taosTmrReset(gSyncEnv.FpEnvTickTimer, gSyncEnv.envTickTimerMS, &gSyncEnv, gSyncEnv.pTimerManager, + &gSyncEnv.pEnvTickTimer); + atomic_store_64(&gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerLogicClockUser); } -static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv) { +void syncEnvStopTimer() { int32_t ret = 0; - atomic_add_fetch_64(&pSyncEnv->envTickTimerLogicClockUser, 1); - taosTmrStop(pSyncEnv->pEnvTickTimer); - pSyncEnv->pEnvTickTimer = NULL; + atomic_add_fetch_64(&gSyncEnv.envTickTimerLogicClockUser, 1); + taosTmrStop(gSyncEnv.pEnvTickTimer); + gSyncEnv.pEnvTickTimer = NULL; return ret; } +#endif + +static void syncEnvTick(void *param, void *tmrId) { + SSyncEnv *pSyncEnv = param; + if (atomic_load_64(&gSyncEnv.envTickTimerLogicClockUser) <= atomic_load_64(&gSyncEnv.envTickTimerLogicClock)) { + gSyncEnv.envTickTimerCounter++; + sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 + ", envTickTimerCounter:%" PRIu64 ", envTickTimerMS:%d, tmrId:%p", + gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter, + gSyncEnv.envTickTimerMS, tmrId); + + // do something, tick ... + taosTmrReset(syncEnvTick, gSyncEnv.envTickTimerMS, pSyncEnv, gSyncEnv.pTimerManager, &gSyncEnv.pEnvTickTimer); + } else { + sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 + ", envTickTimerCounter:%" PRIu64 ", envTickTimerMS:%d, tmrId:%p", + gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter, + gSyncEnv.envTickTimerMS, tmrId); + } +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7142e8fb22f69b3d873a595059be945447c634c7..9ff8917b0f7fa7d45da8ffcb1617a91ca282a049 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -33,11 +33,6 @@ #include "syncTimeout.h" #include "syncUtil.h" #include "syncVoteMgr.h" -#include "tref.h" - -bool gRaftDetailLog = false; - -static int32_t tsNodeRefId = -1; // ------ local funciton --------- // enqueue message ---- @@ -52,138 +47,36 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId); int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg); -// --------------------------------- -static void syncNodeFreeCb(void* param) { - syncNodeClose(param); - param = NULL; -} - -int32_t syncInit() { - int32_t ret = 0; - - if (!syncEnvIsStart()) { - tsNodeRefId = taosOpenRef(200, syncNodeFreeCb); - if (tsNodeRefId < 0) { - sError("failed to init node ref"); - syncCleanUp(); - ret = -1; - } else { - sDebug("sync rsetId:%d is open", tsNodeRefId); - ret = syncEnvStart(); - } - } - - return ret; -} - -void syncCleanUp() { - int32_t ret = syncEnvStop(); - ASSERT(ret == 0); - - if (tsNodeRefId != -1) { - sDebug("sync rsetId:%d is closed", tsNodeRefId); - taosCloseRef(tsNodeRefId); - tsNodeRefId = -1; - } -} - int64_t syncOpen(SSyncInfo* pSyncInfo) { - SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); - if (pSyncNode == NULL) { + SSyncNode* pNode = syncNodeOpen(pSyncInfo); + if (pNode == NULL) { sError("vgId:%d, failed to open sync node", pSyncInfo->vgId); return -1; } - pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode); - if (pSyncNode->rid < 0) { - syncNodeClose(pSyncNode); - pSyncNode = NULL; + pNode->rid = syncNodeAdd(pNode); + if (pNode->rid < 0) { + syncNodeClose(pNode); return -1; } - sDebug("vgId:%d, sync rid:%" PRId64 " is added to rsetId:%d", pSyncInfo->vgId, pSyncNode->rid, tsNodeRefId); - return pSyncNode->rid; + return pNode->rid; } void syncStart(int64_t rid) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - return; - } - - if (pSyncNode->pRaftCfg->isStandBy) { - syncNodeStartStandBy(pSyncNode); - } else { - syncNodeStart(pSyncNode); - } - - taosReleaseRef(tsNodeRefId, pSyncNode->rid); -} - -void syncStartNormal(int64_t rid) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - return; - } - syncNodeStart(pSyncNode); - - taosReleaseRef(tsNodeRefId, pSyncNode->rid); -} - -void syncStartStandBy(int64_t rid) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - return; + SSyncNode* pNode = syncNodeAcquire(rid); + if (pNode != NULL) { + syncNodeStart(pNode); + syncNodeRelease(pNode); } - syncNodeStartStandBy(pSyncNode); - - taosReleaseRef(tsNodeRefId, pSyncNode->rid); } void syncStop(int64_t rid) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) return; - int32_t vgId = pSyncNode->vgId; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); - - taosRemoveRef(tsNodeRefId, rid); - sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%d", vgId, rid, tsNodeRefId); -} - -int32_t syncSetStandby(int64_t rid) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - sError("failed to set standby since accquire ref error, rid:%" PRId64, rid); - return -1; + SSyncNode* pNode = syncNodeAcquire(rid); + if (pNode != NULL) { + syncNodeRelease(pNode); + syncNodeRemove(pNode); } - - if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - terrno = TSDB_CODE_SYN_IS_LEADER; - } else { - terrno = TSDB_CODE_SYN_STANDBY_NOT_READY; - } - sError("failed to set standby since it is not follower, state:%s rid:%" PRId64, syncStr(pSyncNode->state), rid); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return -1; - } - - // state change - pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; - syncNodeStopHeartbeatTimer(pSyncNode); - - // reset elect timer, long enough - int32_t electMS = TIMER_MAX_MS; - int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS); - ASSERT(ret == 0); - - pSyncNode->pRaftCfg->isStandBy = 1; - raftCfgPersist(pSyncNode->pRaftCfg); - - taosReleaseRef(tsNodeRefId, pSyncNode->rid); - sInfo("vgId:%d, set to standby", pSyncNode->vgId); - return 0; } bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg) { @@ -204,7 +97,7 @@ bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg) { } int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -213,7 +106,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg int32_t ret = 0; if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; sError("invalid new config. vgId:%d", pSyncNode->vgId); return -1; @@ -227,12 +120,12 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig); taosMemoryFree(newconfig); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -240,7 +133,7 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { ASSERT(rid == pSyncNode->rid); if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; sError("invalid new config. vgId:%d", pSyncNode->vgId); return -1; @@ -259,7 +152,7 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { taosMemoryFree(newconfig); ret = syncNodePropose(pSyncNode, &rpcMsg, false); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; #else syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg); @@ -275,13 +168,13 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { syncNodeReplicate(pSyncNode); } - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; #endif } int32_t syncLeaderTransfer(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -289,12 +182,12 @@ int32_t syncLeaderTransfer(int64_t rid) { ASSERT(rid == pSyncNode->rid); int32_t ret = syncNodeLeaderTransfer(pSyncNode); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -302,7 +195,7 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { ASSERT(rid == pSyncNode->rid); int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } @@ -358,7 +251,7 @@ char* syncNodePeerState2Str(const SSyncNode* pSyncNode) { } int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -382,7 +275,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { logNum, isEmpty); syncNodeEventLog(pSyncNode, logBuf); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } @@ -411,7 +304,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { syncNodeEventLog(pSyncNode, logBuf); } while (0); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } } @@ -424,7 +317,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { lastApplyIndex, pSyncNode->minMatchIndex); syncNodeEventLog(pSyncNode, logBuf); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } @@ -433,7 +326,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { snprintf(logBuf, sizeof(logBuf), "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex); syncNodeEventLog(pSyncNode, logBuf); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } else { @@ -442,7 +335,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { lastApplyIndex); syncNodeEventLog(pSyncNode, logBuf); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } @@ -491,12 +384,12 @@ _DEL_WAL: } } while (0); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return code; } int32_t syncEndSnapshot(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -510,7 +403,7 @@ int32_t syncEndSnapshot(int64_t rid) { if (code != 0) { sError("vgId:%d, wal snapshot end error since:%s", pSyncNode->vgId, terrstr(terrno)); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return -1; } else { do { @@ -524,12 +417,12 @@ int32_t syncEndSnapshot(int64_t rid) { } } - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return code; } int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -538,7 +431,7 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { syncNodeStepDown(pSyncNode, newTerm); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } @@ -583,19 +476,19 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) { } bool syncCanLeaderTransfer(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return false; } ASSERT(rid == pSyncNode->rid); if (pSyncNode->replicaNum == 1) { - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return false; } if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return true; } @@ -610,7 +503,7 @@ bool syncCanLeaderTransfer(int64_t rid) { } } - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return matchOK; } @@ -620,25 +513,25 @@ int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak) { } ESyncState syncGetMyRole(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } ASSERT(rid == pSyncNode->rid); ESyncState state = pSyncNode->state; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return state; } bool syncIsReady(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return false; } ASSERT(rid == pSyncNode->rid); bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && pSyncNode->restoreFinish; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); // if false, set error code if (false == b) { @@ -652,14 +545,14 @@ bool syncIsReady(int64_t rid) { } bool syncIsRestoreFinish(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return false; } ASSERT(rid == pSyncNode->rid); bool b = pSyncNode->restoreFinish; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return b; } @@ -668,7 +561,7 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho return -1; } - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return -1; } @@ -680,7 +573,7 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho if (pEntry != NULL) { syncEntryDestory(pEntry); } - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return -1; } ASSERT(pEntry != NULL); @@ -691,12 +584,12 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index); syncEntryDestory(pEntry); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return -1; } @@ -705,12 +598,12 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return -1; } @@ -729,7 +622,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex, sMeta->lastConfigIndex); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } @@ -755,67 +648,67 @@ const char* syncGetMyRoleStr(int64_t rid) { } bool syncRestoreFinish(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return false; } ASSERT(rid == pSyncNode->rid); bool restoreFinish = pSyncNode->restoreFinish; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return restoreFinish; } SyncTerm syncGetMyTerm(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } ASSERT(rid == pSyncNode->rid); SyncTerm term = pSyncNode->pRaftStore->currentTerm; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return term; } SyncIndex syncGetLastIndex(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return SYNC_INDEX_INVALID; } ASSERT(rid == pSyncNode->rid); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return lastIndex; } SyncIndex syncGetCommitIndex(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return SYNC_INDEX_INVALID; } ASSERT(rid == pSyncNode->rid); SyncIndex cmtIndex = pSyncNode->commitIndex; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return cmtIndex; } SyncGroupId syncGetVgId(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } ASSERT(rid == pSyncNode->rid); SyncGroupId vgId = pSyncNode->vgId; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return vgId; } void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { memset(pEpSet, 0, sizeof(*pEpSet)); return; @@ -831,11 +724,11 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex; sInfo("vgId:%d, sync get epset in-use:%d", pSyncNode->vgId, pEpSet->inUse); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { memset(pEpSet, 0, sizeof(*pEpSet)); return; @@ -854,11 +747,11 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { } sInfo("vgId:%d, sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } @@ -870,12 +763,12 @@ int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg)); } - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } @@ -888,12 +781,12 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) } sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { sTrace("syncSetQ get pSyncNode is NULL, rid:%" PRId64, rid); return; @@ -901,24 +794,24 @@ void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) { ASSERT(rid == pSyncNode->rid); pSyncNode->msgcb = msgcb; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } char* sync2SimpleStr(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { sTrace("syncSetRpc get pSyncNode is NULL, rid:%" PRId64, rid); return NULL; } ASSERT(rid == pSyncNode->rid); char* s = syncNode2SimpleStr(pSyncNode); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return s; } void setPingTimerMS(int64_t rid, int32_t pingTimerMS) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return; } @@ -926,22 +819,22 @@ void setPingTimerMS(int64_t rid, int32_t pingTimerMS) { pSyncNode->pingBaseLine = pingTimerMS; pSyncNode->pingTimerMS = pingTimerMS; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } void setElectTimerMS(int64_t rid, int32_t electTimerMS) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return; } ASSERT(rid == pSyncNode->rid); pSyncNode->electBaseLine = electTimerMS; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return; } @@ -949,20 +842,20 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { pSyncNode->hbBaseLine = hbTimerMS; pSyncNode->heartbeatTimerMS = hbTimerMS; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { - taosReleaseRef(tsNodeRefId, rid); + syncNodeRelease(pSyncNode); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; } ASSERT(rid == pSyncNode->rid); int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } @@ -1084,7 +977,7 @@ int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId de int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { int32_t ret = 0; - if (syncEnvIsStart()) { + if (syncIsInit()) { SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData)); pData->pSyncNode = pSyncNode; pData->pTimer = pSyncTimer; @@ -1092,7 +985,7 @@ int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { pData->logicClock = pSyncTimer->logicClock; pSyncTimer->pData = pData; - taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, gSyncEnv->pTimerManager, &pSyncTimer->pTimer); + taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer); } else { sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId); } @@ -1556,8 +1449,8 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode) { // timer control -------------- int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t ret = 0; - if (syncEnvIsStart()) { - taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, + if (syncIsInit()) { + taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager, &pSyncNode->pPingTimer); atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); } else { @@ -1576,7 +1469,7 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t ret = 0; - if (syncEnvIsStart()) { + if (syncIsInit()) { pSyncNode->electTimerMS = ms; SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer)); @@ -1584,7 +1477,7 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { pElectTimer->pSyncNode = pSyncNode; pElectTimer->pData = NULL; - taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, gSyncEnv->pTimerManager, + taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, syncEnv()->pTimerManager, &pSyncNode->pElectTimer); } else { @@ -1632,8 +1525,8 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) { static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t ret = 0; - if (syncEnvIsStart()) { - taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, + if (syncIsInit()) { + taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); } else { @@ -2325,17 +2218,6 @@ _END: return; } -SSyncNode* syncNodeAcquire(int64_t rid) { - SSyncNode* pNode = taosAcquireRef(tsNodeRefId, rid); - if (pNode == NULL) { - sTrace("failed to acquire node from refId:%" PRId64, rid); - } - - return pNode; -} - -void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid); } - // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { if (term > pSyncNode->pRaftStore->currentTerm) { @@ -2786,8 +2668,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { } syncTimeoutDestroy(pSyncMsg); - if (syncEnvIsStart()) { - taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, + if (syncIsInit()) { + taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager, &pSyncNode->pPingTimer); } else { sError("sync env is stop, syncNodeEqPingTimer"); @@ -2832,9 +2714,9 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { #if 0 // reset timer ms - if (syncEnvIsStart() && pSyncNode->electBaseLine > 0) { + if (syncIsInit() && pSyncNode->electBaseLine > 0) { pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); - taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, + taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager, &pSyncNode->pElectTimer); } else { sError("sync env is stop, syncNodeEqElectTimer"); @@ -2869,8 +2751,8 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { } syncTimeoutDestroy(pSyncMsg); - if (syncEnvIsStart()) { - taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, + if (syncIsInit()) { + taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer); } else { sError("sync env is stop, syncNodeEqHeartbeatTimer"); @@ -2930,8 +2812,8 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { syncHeartbeatDestroy(pSyncMsg); - if (syncEnvIsStart()) { - taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, gSyncEnv->pTimerManager, + if (syncIsInit()) { + taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer); } else { sError("sync env is stop, syncNodeEqHeartbeatTimer"); diff --git a/source/libs/sync/test/syncElectTest.cpp b/source/libs/sync/test/syncElectTest.cpp index 862f7bd0baebed56d2e103c56ae1a88e9074c8ea..d09879a69936db6d873ecafb86cc3c9bab3640be 100644 --- a/source/libs/sync/test/syncElectTest.cpp +++ b/source/libs/sync/test/syncElectTest.cpp @@ -98,7 +98,7 @@ int main(int argc, char** argv) { init(); int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); char walPath[128]; diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp index 9f1a81e7ed8c3805874ba36c9e9a696406dc8314..fdb5cf7ac89b885e4ff8a4a8d6ef2a83ebc57c1f 100644 --- a/source/libs/sync/test/syncEncodeTest.cpp +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -152,7 +152,7 @@ int main(int argc, char **argv) { int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); taosRemoveDir("./wal_test"); diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp index 8461bfe9b7403c70d45a79f59474ad5d0ca43b2a..191e245b1e26e6cefc85a2bab6529a1601823275 100644 --- a/source/libs/sync/test/syncEnqTest.cpp +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -81,7 +81,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncEnvTest.cpp b/source/libs/sync/test/syncEnvTest.cpp index a7a819e0465b34de466c132934bc2b4c209b2fd1..404ab32d582c3347720bc6997fa154be82c521c0 100644 --- a/source/libs/sync/test/syncEnvTest.cpp +++ b/source/libs/sync/test/syncEnvTest.cpp @@ -22,7 +22,7 @@ int main() { logTest(); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); for (int i = 0; i < 5; ++i) { @@ -37,8 +37,6 @@ int main() { taosMsleep(5000); } - ret = syncEnvStop(); - assert(ret == 0); - + syncCleanUp(); return 0; } diff --git a/source/libs/sync/test/syncIOSendMsgTest.cpp b/source/libs/sync/test/syncIOSendMsgTest.cpp index 630d96054bef043134b76233683551abe682bd6c..4a457136c2e924a0b70b891997b8da45e96f3775 100644 --- a/source/libs/sync/test/syncIOSendMsgTest.cpp +++ b/source/libs/sync/test/syncIOSendMsgTest.cpp @@ -82,7 +82,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncInitTest.cpp b/source/libs/sync/test/syncInitTest.cpp index d0843151f4b3b04c2c67d44b4cdce3a48e78d6d8..d654ad06feb2a73132c4c1fe6f57f7465b619d23 100644 --- a/source/libs/sync/test/syncInitTest.cpp +++ b/source/libs/sync/test/syncInitTest.cpp @@ -81,7 +81,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); @@ -91,7 +91,7 @@ int main(int argc, char** argv) { initRaftId(pSyncNode); syncNodeClose(pSyncNode); - syncEnvStop(); + syncCleanUp(); // syncIOStop(); // taosCloseLog(); diff --git a/source/libs/sync/test/syncPingSelfTest.cpp b/source/libs/sync/test/syncPingSelfTest.cpp index 99287bf7b0fd23c350e7d5f8503b17efee641b7e..f44cbb04d51ac747b7737c8060cd0ba3b98aa921 100644 --- a/source/libs/sync/test/syncPingSelfTest.cpp +++ b/source/libs/sync/test/syncPingSelfTest.cpp @@ -81,7 +81,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncPingTimerTest.cpp b/source/libs/sync/test/syncPingTimerTest.cpp index cd9440e3e280b71820ca5db8ed40bcc8b2c37d26..fd6342aa84c389ce9b5bd583eff9a56bffccb55e 100644 --- a/source/libs/sync/test/syncPingTimerTest.cpp +++ b/source/libs/sync/test/syncPingTimerTest.cpp @@ -81,7 +81,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncPingTimerTest2.cpp b/source/libs/sync/test/syncPingTimerTest2.cpp index fa09d04368178e7b7ab6c94cd53c7672174bdf7b..295003dff3761237a9e8d2dd042457bb1b53d663 100644 --- a/source/libs/sync/test/syncPingTimerTest2.cpp +++ b/source/libs/sync/test/syncPingTimerTest2.cpp @@ -81,7 +81,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 3dc85180728c4a287758156b47309e48a7cc01cd..50771ac476babe2565df792ddc87d74ca36d7b3c 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -179,7 +179,7 @@ int main(int argc, char **argv) { int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); // taosRemoveDir(pWalDir); diff --git a/source/libs/sync/test/syncVotesGrantedTest.cpp b/source/libs/sync/test/syncVotesGrantedTest.cpp index d4885d0316b3a099f1b1e1c0bca45b4f1f471849..e2e8748697f22dfd2ad03302812dcfe41291984a 100644 --- a/source/libs/sync/test/syncVotesGrantedTest.cpp +++ b/source/libs/sync/test/syncVotesGrantedTest.cpp @@ -82,7 +82,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncVotesRespondTest.cpp b/source/libs/sync/test/syncVotesRespondTest.cpp index 77262dfc65e0f134af75e8a11be674c40a1437ab..881a5331b13072f7c4e10f3ee12a161d72d2c4f9 100644 --- a/source/libs/sync/test/syncVotesRespondTest.cpp +++ b/source/libs/sync/test/syncVotesRespondTest.cpp @@ -82,7 +82,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp index b185f52f7547d1a60541177f3a7374f3ed694081..fee98ddd52c33192efb4bdc6d0c707269f3c0743 100644 --- a/source/libs/sync/test/syncWriteTest.cpp +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -154,7 +154,7 @@ int main(int argc, char **argv) { int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); taosRemoveDir("./wal_test"); diff --git a/source/util/src/tref.c b/source/util/src/tref.c index c984ef3f34dd50b669acf12f14233d6f7374f3b5..aa741b909aac146a4b073ad04aac03a4e21fab5a 100644 --- a/source/util/src/tref.c +++ b/source/util/src/tref.c @@ -57,7 +57,7 @@ static void taosIncRsetCount(SRefSet *pSet); static void taosDecRsetCount(SRefSet *pSet); static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove); -int32_t taosOpenRef(int32_t max, void (*fp)(void *)) { +int32_t taosOpenRef(int32_t max, RefFp fp) { SRefNode **nodeList; SRefSet *pSet; int64_t *lockedBy;