From acc5c53b126e079f25613ab15c3c1354ce031343 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 5 Nov 2020 13:15:23 +0000 Subject: [PATCH] make changes on app --- src/client/inc/tsclient.h | 3 +- src/client/src/tscServer.c | 24 ++++----- src/client/src/tscSql.c | 24 ++++----- src/client/src/tscUtil.c | 2 +- src/inc/trpc.h | 4 +- src/inc/tsync.h | 14 ++--- src/mnode/src/mnodeSdb.c | 6 +-- src/rpc/src/rpcMain.c | 6 +-- src/sync/inc/syncInt.h | 1 + src/sync/src/syncMain.c | 104 +++++++++++++++++-------------------- src/sync/test/syncServer.c | 2 +- src/vnode/inc/vnodeInt.h | 2 +- src/vnode/src/vnodeMain.c | 20 +++---- src/wal/inc/walInt.h | 1 + src/wal/src/walMgmt.c | 9 ++-- 15 files changed, 106 insertions(+), 116 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 1c2821638b..4c85af4919 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -330,6 +330,7 @@ typedef struct STscObj { char writeAuth : 1; char superAuth : 1; uint32_t connId; + uint64_t rid; // ref ID returned by taosAddRef struct SSqlObj * pHb; struct SSqlObj * sqlList; struct SSqlStream *streamList; @@ -348,7 +349,7 @@ typedef struct SSqlObj { void *signature; pthread_t owner; // owner of sql object, by which it is executed STscObj *pTscObj; - void *pRpcCtx; + int64_t rpcRid; void (*fp)(); void (*fetchFp)(); void *param; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1584afe706..a3e2eba168 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -182,27 +182,23 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { int32_t waitingDuring = tsShellActivityTimer * 500; tscDebug("%p send heartbeat in %dms", pSql, waitingDuring); - taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer); + taosTmrReset(tscProcessActivityTimer, waitingDuring, (void *)pObj->rid, tscTmr, &pObj->pTimer); } else { tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj); } } void tscProcessActivityTimer(void *handle, void *tmrId) { - STscObj *pObj = (STscObj *)handle; - - int ret = taosAcquireRef(tscRefId, pObj); - if (ret < 0) { - tscTrace("%p failed to acquire TSC obj, reason:%s", pObj, tstrerror(ret)); - return; - } + int64_t rid = (int64_t) handle; + STscObj *pObj = taosAcquireRef(tscRefId, rid); + if (pObj == NULL) return; SSqlObj* pHB = pObj->pHb; void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE)); if (p == NULL) { tscWarn("%p HB object has been released already", pHB); - taosReleaseRef(tscRefId, pObj); + taosReleaseRef(tscRefId, pObj->rid); return; } @@ -216,7 +212,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); } - taosReleaseRef(tscRefId, pObj); + taosReleaseRef(tscRefId, rid); } int tscSendMsgToServer(SSqlObj *pSql) { @@ -241,7 +237,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { .pCont = pMsg, .contLen = pSql->cmd.payloadLen, .ahandle = pSql, - .handle = &pSql->pRpcCtx, + .handle = NULL, .code = 0 }; @@ -249,7 +245,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { // Otherwise, the pSql object may have been released already during the response function, which is // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely // cause crash. - rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); + pSql->rpcRid = rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); return TSDB_CODE_SUCCESS; } @@ -269,7 +265,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { SSqlCmd *pCmd = &pSql->cmd; assert(*pSql->self == pSql); - pSql->pRpcCtx = NULL; + pSql->rpcRid = -1; if (pObj->signature != pObj) { tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature); @@ -2026,7 +2022,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { createHBObj(pObj); //launch a timer to send heartbeat to maintain the connection and send status to mnode - taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); + taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, (void *)pObj->rid, tscTmr, &pObj->pTimer); return 0; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 89dfa24e8f..bdc46c5446 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -161,7 +161,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa registerSqlObj(pSql); tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg); - taosAddRef(tscRefId, pObj); + pObj->rid = taosAddRef(tscRefId, pObj); return pSql; } @@ -279,9 +279,9 @@ void taos_close(TAOS *taos) { SSqlObj* pHb = pObj->pHb; if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) { - if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode - rpcCancelRequest(pHb->pRpcCtx); - pHb->pRpcCtx = NULL; + if (pHb->rpcRid > 0) { // wait for rsp from dnode + rpcCancelRequest(pHb->rpcRid); + pHb->rpcRid = -1; } tscDebug("%p HB is freed", pHb); @@ -298,7 +298,7 @@ void taos_close(TAOS *taos) { tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn); - taosRemoveRef(tscRefId, pObj); + taosRemoveRef(tscRefId, pObj->rid); } void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { @@ -748,9 +748,9 @@ static void tscKillSTableQuery(SSqlObj *pSql) { assert(pSubObj->self == (SSqlObj**) p); pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; - if (pSubObj->pRpcCtx != NULL) { - rpcCancelRequest(pSubObj->pRpcCtx); - pSubObj->pRpcCtx = NULL; + if (pSubObj->rpcRid > 0) { + rpcCancelRequest(pSubObj->rpcRid); + pSubObj->rpcRid = -1; } tscQueueAsyncRes(pSubObj); @@ -775,7 +775,7 @@ void taos_stop_query(TAOS_RES *res) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { - assert(pSql->pRpcCtx == NULL); + assert(pSql->rpcRid <= 0); tscKillSTableQuery(pSql); } else { if (pSql->cmd.command < TSDB_SQL_LOCAL) { @@ -784,9 +784,9 @@ void taos_stop_query(TAOS_RES *res) { * reset and freed in the processMsgFromServer function, and causes the invalid * write problem for rpcCancelRequest. */ - if (pSql->pRpcCtx != NULL) { - rpcCancelRequest(pSql->pRpcCtx); - pSql->pRpcCtx = NULL; + if (pSql->rpcRid > 0) { + rpcCancelRequest(pSql->rpcRid); + pSql->rpcRid = -1; } tscQueueAsyncRes(pSql); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 879eeeaded..2aee90653d 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -376,7 +376,7 @@ void tscFreeRegisteredSqlObj(void *pSql) { tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref); if (ref == 0) { tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj); - taosRemoveRef(tscRefId, pTscObj); + taosRemoveRef(tscRefId, pTscObj->rid); } } diff --git a/src/inc/trpc.h b/src/inc/trpc.h index bdee917b5e..e430a43807 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -83,13 +83,13 @@ void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void *rpcReallocCont(void *ptr, int contLen); -void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg); +int64_t rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); int rpcReportProgress(void *pConn, char *pCont, int contLen); -void rpcCancelRequest(void *pContext); +void rpcCancelRequest(int64_t rid); #ifdef __cplusplus } diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 671adefab8..1b16fef84c 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -106,13 +106,13 @@ typedef void* tsync_h; int32_t syncInit(); void syncCleanUp(); -tsync_h syncStart(const SSyncInfo *); -void syncStop(tsync_h shandle); -int32_t syncReconfig(tsync_h shandle, const SSyncCfg *); -int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype); -void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code); -void syncRecover(tsync_h shandle); // recover from other nodes: -int syncGetNodesRole(tsync_h shandle, SNodesRole *); +int64_t syncStart(const SSyncInfo *); +void syncStop(int64_t rid); +int32_t syncReconfig(int64_t rid, const SSyncCfg *); +int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype); +void syncConfirmForward(int64_t rid, uint64_t version, int32_t code); +void syncRecover(int64_t rid); // recover from other nodes: +int syncGetNodesRole(int64_t rid, SNodesRole *); extern char *syncRole[]; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 8fb0b33060..f6acb6826c 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -72,7 +72,7 @@ typedef struct { ESyncRole role; ESdbStatus status; int64_t version; - void * sync; + int64_t sync; void * wal; SSyncCfg cfg; int32_t numOfTables; @@ -212,7 +212,7 @@ static void sdbRestoreTables() { } void sdbUpdateMnodeRoles() { - if (tsSdbObj.sync == NULL) return; + if (tsSdbObj.sync <= 0) return; SNodesRole roles = {0}; syncGetNodesRole(tsSdbObj.sync, &roles); @@ -433,7 +433,7 @@ void sdbCleanUp() { if (tsSdbObj.sync) { syncStop(tsSdbObj.sync); - tsSdbObj.sync = NULL; + tsSdbObj.sync = -1; } if (tsSdbObj.wal) { diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 05330ebff8..dc67f6a80f 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -560,7 +560,7 @@ void rpcCancelRequest(int64_t rid) { rpcCloseConn(pContext->pConn); - taosReleaseRef(tsRpcRefId, pContext); + taosReleaseRef(tsRpcRefId, rid); } static void rpcFreeMsg(void *msg) { @@ -629,7 +629,7 @@ static void rpcReleaseConn(SRpcConn *pConn) { // if there is an outgoing message, free it if (pConn->outType && pConn->pReqMsg) { if (pConn->pContext) pConn->pContext->pConn = NULL; - taosRemoveRef(tsRpcRefId, pConn->pContext); + taosRemoveRef(tsRpcRefId, pConn->pContext->rid); } } @@ -1110,7 +1110,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { } // free the request message - taosRemoveRef(tsRpcRefId, pContext); + taosRemoveRef(tsRpcRefId, pContext->rid); } static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index f681810646..8808a82c46 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -141,6 +141,7 @@ typedef struct SSyncNode { int8_t replica; int8_t quorum; uint32_t vgId; + int64_t rid; void *ahandle; int8_t selfIndex; SSyncPeer *peerInfo[TAOS_SYNC_MAX_REPLICA+1]; // extra one for arbitrator diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 6f5e3be8ab..74e327183b 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -142,14 +142,14 @@ void syncCleanUp() { sInfo("sync module is cleaned up"); } -void *syncStart(const SSyncInfo *pInfo) { +int64_t syncStart(const SSyncInfo *pInfo) { const SSyncCfg *pCfg = &pInfo->syncCfg; SSyncNode *pNode = (SSyncNode *)calloc(sizeof(SSyncNode), 1); if (pNode == NULL) { sError("no memory to allocate syncNode"); terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; + return -1; } tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path)); @@ -170,10 +170,10 @@ void *syncStart(const SSyncInfo *pInfo) { pNode->quorum = pCfg->quorum; if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica; - int ret = taosAddRef(tsSyncRefId, pNode); - if (ret < 0) { + pNode->rid = taosAddRef(tsSyncRefId, pNode); + if (pNode->rid < 0) { syncFreeNode(pNode); - return NULL; + return -1; } for (int i = 0; i < pCfg->replica; ++i) { @@ -187,8 +187,8 @@ void *syncStart(const SSyncInfo *pInfo) { if (pNode->selfIndex < 0) { sInfo("vgId:%d, this node is not configured", pNode->vgId); terrno = TSDB_CODE_SYN_INVALID_CONFIG; - syncStop(pNode); - return NULL; + syncStop(pNode->rid); + return -1; } nodeVersion = pInfo->version; // set the initial version @@ -200,15 +200,15 @@ void *syncStart(const SSyncInfo *pInfo) { if (pNode->pSyncFwds == NULL) { sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId); terrno = TAOS_SYSTEM_ERROR(errno); - syncStop(pNode); - return NULL; + syncStop(pNode->rid); + return -1; } - pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl); if (pNode->pFwdTimer == NULL) { sError("vgId:%d, failed to allocate timer", pNode->vgId); - syncStop(pNode); - return NULL; + syncStop(pNode->rid); + return -1; } syncAddArbitrator(pNode); @@ -218,15 +218,14 @@ void *syncStart(const SSyncInfo *pInfo) { (*pNode->notifyRole)(pNode->ahandle, nodeRole); } - return pNode; + return pNode->rid; } -void syncStop(void *param) { - SSyncNode *pNode = param; +void syncStop(int64_t rid) { SSyncPeer *pPeer; - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return; + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return; sInfo("vgId:%d, cleanup sync", pNode->vgId); @@ -245,16 +244,15 @@ void syncStop(void *param) { pthread_mutex_unlock(&(pNode->mutex)); - taosReleaseRef(tsSyncRefId, pNode); - taosRemoveRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); + taosRemoveRef(tsSyncRefId, rid); } -int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { - SSyncNode *pNode = param; +int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { int i, j; - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return TSDB_CODE_SYN_INVALID_CONFIG; + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, pNode->replica); @@ -318,29 +316,25 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { syncRole[nodeRole]); syncBroadcastStatus(pNode); - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); return 0; } -int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { - SSyncNode *pNode = param; - - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return 0; +int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int qtype) { + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return 0; int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype); - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); return code; } -void syncConfirmForward(void *param, uint64_t version, int32_t code) { - SSyncNode *pNode = param; - - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return; +void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return; SSyncPeer *pPeer = pNode->pMaster; if (pPeer && pNode->quorum > 1) { @@ -365,15 +359,14 @@ void syncConfirmForward(void *param, uint64_t version, int32_t code) { } } - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); } -void syncRecover(void *param) { - SSyncNode *pNode = param; +void syncRecover(int64_t rid) { SSyncPeer *pPeer; - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return; + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return; // to do: add a few lines to check if recover is OK // if take this node to unsync state, the whole system may not work @@ -393,14 +386,12 @@ void syncRecover(void *param) { pthread_mutex_unlock(&(pNode->mutex)); - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); } -int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { - SSyncNode *pNode = param; - - int ret = taosAcquireRef(tsSyncRefId, pNode); - if (ret < 0) return -1; +int syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) { + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return -1; pNodesRole->selfIndex = pNode->selfIndex; for (int i = 0; i < pNode->replica; ++i) { @@ -408,7 +399,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { pNodesRole->role[i] = pNode->peerInfo[i]->role; } - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); return 0; } @@ -455,7 +446,7 @@ void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); int syncDecPeerRef(SSyncPeer *pPeer) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { - taosReleaseRef(tsSyncRefId, pPeer->pSyncNode); + taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid); sDebug("%s, resource is freed", pPeer->id); taosTFree(pPeer->watchFd); @@ -512,7 +503,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer); } - taosAcquireRef(tsSyncRefId, pNode); + taosAcquireRef(tsSyncRefId, pNode->rid); return pPeer; } @@ -1105,7 +1096,7 @@ static void syncProcessBrokenLink(void *param) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; - if (taosAcquireRef(tsSyncRefId, pNode) < 0) return; + if (taosAcquireRef(tsSyncRefId, pNode->rid) < 0) return; pthread_mutex_lock(&(pNode->mutex)); sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno)); @@ -1116,7 +1107,7 @@ static void syncProcessBrokenLink(void *param) { } pthread_mutex_unlock(&(pNode->mutex)); - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, pNode->rid); } static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { @@ -1184,10 +1175,9 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code } static void syncMonitorFwdInfos(void *param, void *tmrId) { - SSyncNode *pNode = param; - - int ret = taosAcquireRef(tsSyncRefId, pNode); - if ( ret < 0) return; + int64_t rid = (int64_t) param; + SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + if (pNode == NULL) return; SSyncFwds *pSyncFwds = pNode->pSyncFwds; @@ -1206,10 +1196,10 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { pthread_mutex_unlock(&(pNode->mutex)); } - pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, syncTmrCtrl); } - taosReleaseRef(tsSyncRefId, pNode); + taosReleaseRef(tsSyncRefId, rid); } static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) { diff --git a/src/sync/test/syncServer.c b/src/sync/test/syncServer.c index 0cf752da97..9dd3feb461 100644 --- a/src/sync/test/syncServer.c +++ b/src/sync/test/syncServer.c @@ -30,7 +30,7 @@ int dataFd = -1; void * qhandle = NULL; int walNum = 0; uint64_t tversion = 0; -void * syncHandle; +int64_t syncHandle; int role; int nodeId; char path[256]; diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 169334c611..9a7722c57d 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -47,7 +47,7 @@ typedef struct { void *rqueue; void *wal; void *tsdb; - void *sync; + int64_t sync; void *events; void *cq; // continuous query int32_t cfgVersion; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index e206933116..ab1341a0da 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -44,12 +44,12 @@ static void vnodeCtrlFlow(void *handle, int32_t mseconds); static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); #ifndef _SYNC -tsync_h syncStart(const SSyncInfo *info) { return NULL; } -int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; } -void syncStop(tsync_h shandle) {} -int32_t syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; } -int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; } -void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {} +int64_t syncStart(const SSyncInfo *info) { return NULL; } +int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int qtype) { return 0; } +void syncStop(int64_t rid) {} +int32_t syncReconfig(int64_t rid, const SSyncCfg * cfg) { return 0; } +int syncGetNodesRole(int64_t rid, SNodesRole * cfg) { return 0; } +void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {} #endif char* vnodeStatus[] = { @@ -330,7 +330,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { #ifndef _SYNC pVnode->role = TAOS_SYNC_ROLE_MASTER; #else - if (pVnode->sync == NULL) { + if (pVnode->sync <= 0) { vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica, tstrerror(terrno)); vnodeCleanUp(pVnode); @@ -589,9 +589,9 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { } // stop replication module - if (pVnode->sync) { - void *sync = pVnode->sync; - pVnode->sync = NULL; + if (pVnode->sync > 0) { + int64_t sync = pVnode->sync; + pVnode->sync = -1; syncStop(sync); } diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index 5273eb5b1c..d1e9772259 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -43,6 +43,7 @@ extern int32_t wDebugFlag; typedef struct { uint64_t version; int64_t fileId; + int64_t rid; int32_t vgId; int32_t fd; int32_t keep; diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index c8f0274174..2c4349ed3a 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -78,7 +78,8 @@ void *walOpen(char *path, SWalCfg *pCfg) { return NULL; } - if (taosAddRef(tsWal.refId, pWal) != TSDB_CODE_SUCCESS) { + pWal->rid = taosAddRef(tsWal.refId, pWal); + if (pWal->rid < 0) { walFreeObj(pWal); return NULL; } @@ -143,7 +144,7 @@ void walClose(void *handle) { } pthread_mutex_unlock(&pWal->mutex); - taosRemoveRef(tsWal.refId, pWal); + taosRemoveRef(tsWal.refId, pWal->rid); } static int32_t walInitObj(SWal *pWal) { @@ -185,7 +186,7 @@ static void walUpdateSeq() { } static void walFsyncAll() { - SWal *pWal = taosIterateRef(tsWal.refId, NULL); + SWal *pWal = taosIterateRef(tsWal.refId, 0); while (pWal) { if (walNeedFsync(pWal)) { wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); @@ -194,7 +195,7 @@ static void walFsyncAll() { wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code)); } } - pWal = taosIterateRef(tsWal.refId, pWal); + pWal = taosIterateRef(tsWal.refId, pWal->rid); } } -- GitLab