From 232d59a935b21a9703348775c4728fc9223e6950 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 22 Dec 2020 18:07:25 +0800 Subject: [PATCH] TD-1927 --- src/sync/inc/syncInt.h | 18 +-- src/sync/src/syncMain.c | 219 ++++++++++++++++++++++++------------ src/sync/src/syncRestore.c | 7 +- src/sync/src/syncRetrieve.c | 7 +- src/sync/src/syncTcp.c | 2 +- src/vnode/src/vnodeMgmt.c | 5 +- 6 files changed, 172 insertions(+), 86 deletions(-) diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index d855c651f9..ae6a5a4fe4 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -86,9 +86,10 @@ typedef struct SsyncPeer { int32_t peerFd; // forward FD int32_t numOfRetrieves; // number of retrieves tried int32_t fileChanged; // a flag to indicate file is changed during retrieving process + int32_t refCount; + int64_t rid; void * timer; void * pConn; - int32_t refCount; // reference count struct SSyncNode *pSyncNode; } SSyncPeer; @@ -98,6 +99,7 @@ typedef struct SSyncNode { int8_t quorum; int8_t selfIndex; uint32_t vgId; + int32_t refCount; int64_t rid; SSyncPeer * peerInfo[TAOS_SYNC_MAX_REPLICA + 1]; // extra one for arbitrator SSyncPeer * pMaster; @@ -121,13 +123,13 @@ extern int32_t tsSyncNum; extern char tsNodeFqdn[TSDB_FQDN_LEN]; extern char * syncStatus[]; -void *syncRetrieveData(void *param); -void *syncRestoreData(void *param); -int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead); -void syncRestartConnection(SSyncPeer *pPeer); -void syncBroadcastStatus(SSyncNode *pNode); -void syncAddPeerRef(SSyncPeer *pPeer); -int32_t syncDecPeerRef(SSyncPeer *pPeer); +void * syncRetrieveData(void *param); +void * syncRestoreData(void *param); +int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead); +void syncRestartConnection(SSyncPeer *pPeer); +void syncBroadcastStatus(SSyncNode *pNode); +SSyncPeer *syncAcquirePeer(int64_t rid); +void syncReleasePeer(SSyncPeer *pPeer); #ifdef __cplusplus } diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 26b6586587..4f42573388 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -35,7 +35,8 @@ char tsNodeFqdn[TSDB_FQDN_LEN] = {0}; static void * tsTcpPool = NULL; static void * tsSyncTmrCtrl = NULL; static void * tsVgIdHash = NULL; -static int32_t tsSyncRefId = -1; +static int32_t tsNodeRefId = -1; +static int32_t tsPeerRefId = -1; // local functions static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); @@ -48,6 +49,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) static void syncRemovePeer(SSyncPeer *pPeer); static void syncAddArbitrator(SSyncNode *pNode); static void syncFreeNode(void *); +static void syncFreePeer(void *); static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode); static void syncMonitorFwdInfos(void *param, void *tmrId); static void syncMonitorNodeRole(void *param, void *tmrId); @@ -55,7 +57,10 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t c static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle); static void syncRestartPeer(SSyncPeer *pPeer); static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtyp); + static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo); +static SSyncNode *syncAcquireNode(int64_t rid); +static void syncReleaseNode(SSyncNode *pNode); char* syncRole[] = { "offline", @@ -87,29 +92,34 @@ int32_t syncInit() { tsTcpPool = syncOpenTcpThreadPool(&info); if (tsTcpPool == NULL) { sError("failed to init tcpPool"); + syncCleanUp(); return -1; } tsSyncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC"); if (tsSyncTmrCtrl == NULL) { sError("failed to init tmrCtrl"); - syncCloseTcpThreadPool(tsTcpPool); - tsTcpPool = NULL; + syncCleanUp(); return -1; } tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (tsVgIdHash == NULL) { sError("failed to init vgIdHash"); - taosTmrCleanUp(tsSyncTmrCtrl); - syncCloseTcpThreadPool(tsTcpPool); - tsTcpPool = NULL; - tsSyncTmrCtrl = NULL; + syncCleanUp(); + return -1; + } + + tsNodeRefId = taosOpenRef(200, syncFreeNode); + if (tsNodeRefId < 0) { + sError("failed to init node ref"); + syncCleanUp(); return -1; } - tsSyncRefId = taosOpenRef(200, syncFreeNode); - if (tsSyncRefId < 0) { + tsPeerRefId = taosOpenRef(1000, syncFreePeer); + if (tsPeerRefId < 0) { + sError("failed to init peer ref"); syncCleanUp(); return -1; } @@ -121,12 +131,12 @@ int32_t syncInit() { } void syncCleanUp() { - if (tsTcpPool) { + if (tsTcpPool != NULL) { syncCloseTcpThreadPool(tsTcpPool); tsTcpPool = NULL; } - if (tsSyncTmrCtrl) { + if (tsSyncTmrCtrl != NULL) { taosTmrCleanUp(tsSyncTmrCtrl); tsSyncTmrCtrl = NULL; } @@ -136,8 +146,15 @@ void syncCleanUp() { tsVgIdHash = NULL; } - taosCloseRef(tsSyncRefId); - tsSyncRefId = -1; + if (tsNodeRefId != -1) { + taosCloseRef(tsNodeRefId); + tsNodeRefId = -1; + } + + if (tsPeerRefId != -1) { + taosCloseRef(tsPeerRefId); + tsPeerRefId = -1; + } sInfo("sync module is cleaned up"); } @@ -170,7 +187,8 @@ int64_t syncStart(const SSyncInfo *pInfo) { pNode->quorum = pCfg->quorum; if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica; - pNode->rid = taosAddRef(tsSyncRefId, pNode); + pNode->refCount = 1; + pNode->rid = taosAddRef(tsNodeRefId, pNode); if (pNode->rid < 0) { syncFreeNode(pNode); return -1; @@ -238,7 +256,7 @@ int64_t syncStart(const SSyncInfo *pInfo) { void syncStop(int64_t rid) { SSyncPeer *pPeer; - SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + SSyncNode *pNode = syncAcquireNode(rid); if (pNode == NULL) return; sInfo("vgId:%d, cleanup sync", pNode->vgId); @@ -259,14 +277,14 @@ void syncStop(int64_t rid) { pthread_mutex_unlock(&pNode->mutex); - taosReleaseRef(tsSyncRefId, rid); - taosRemoveRef(tsSyncRefId, rid); + syncReleaseNode(pNode); + taosRemoveRef(tsNodeRefId, rid); } int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { int32_t i, j; - SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + SSyncNode *pNode = syncAcquireNode(rid); if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, @@ -335,23 +353,22 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { sInfo("vgId:%d, %d replicas are configured, quorum:%d", pNode->vgId, pNode->replica, pNode->quorum); syncBroadcastStatus(pNode); - taosReleaseRef(tsSyncRefId, rid); + syncReleaseNode(pNode); return 0; } int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int32_t qtype) { - SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); - if (pNode == NULL) return 0; + SSyncNode *pNode = syncAcquireNode(rid); + if (pNode == NULL) return 0; int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype); - taosReleaseRef(tsSyncRefId, rid); - + syncReleaseNode(pNode); return code; } void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { - SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + SSyncNode *pNode = syncAcquireNode(rid); if (pNode == NULL) return; SSyncPeer *pPeer = pNode->pMaster; @@ -367,14 +384,14 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { } } - taosReleaseRef(tsSyncRefId, rid); + syncReleaseNode(pNode); } #if 0 void syncRecover(int64_t rid) { SSyncPeer *pPeer; - SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + SSyncNode *pNode = syncAcquireNode(rid); if (pNode == NULL) return; // to do: add a few lines to check if recover is OK @@ -395,12 +412,12 @@ void syncRecover(int64_t rid) { pthread_mutex_unlock(&pNode->mutex); - taosReleaseRef(tsSyncRefId, rid); + syncReleaseNode(pNode); } #endif int32_t syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) { - SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + SSyncNode *pNode = syncAcquireNode(rid); if (pNode == NULL) return -1; pNodesRole->selfIndex = pNode->selfIndex; @@ -409,8 +426,7 @@ int32_t syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) { pNodesRole->role[i] = pNode->peerInfo[i]->role; } - taosReleaseRef(tsSyncRefId, rid); - + syncReleaseNode(pNode); return 0; } @@ -446,24 +462,61 @@ static void syncAddArbitrator(SSyncNode *pNode) { static void syncFreeNode(void *param) { SSyncNode *pNode = param; + int32_t refCount = atomic_sub_fetch_32(&pNode->refCount, 1); + sDebug("vgId:%d, snode is freed, refCount:%d", pNode->vgId, refCount); + pthread_mutex_destroy(&pNode->mutex); tfree(pNode->pRecv); tfree(pNode->pSyncFwds); tfree(pNode); } -void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_32(&pPeer->refCount, 1); } +SSyncNode *syncAcquireNode(int64_t rid) { + SSyncNode *pNode = taosAcquireRef(tsNodeRefId, rid); + if (pNode == NULL) { + sDebug("failed to acquire snode from refId:%" PRId64, rid); + } else { + int32_t refCount = atomic_add_fetch_32(&pNode->refCount, 1); + sTrace("vgId:%d, acquire snode refId:%" PRId64 ", refCount:%d", pNode->vgId, rid, refCount); + } -int32_t syncDecPeerRef(SSyncPeer *pPeer) { - if (atomic_sub_fetch_32(&pPeer->refCount, 1) == 0) { - taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid); + return pNode; +} - sDebug("%s, resource is freed", pPeer->id); - tfree(pPeer); - return 0; +void syncReleaseNode(SSyncNode *pNode) { + int32_t refCount = atomic_sub_fetch_32(&pNode->refCount, 1); + sTrace("vgId:%d, dec snode refId:%" PRId64 " refCount:%d", pNode->vgId, pNode->rid, refCount); + + taosReleaseRef(tsNodeRefId, pNode->rid); +} + +static void syncFreePeer(void *param) { + SSyncPeer *pPeer = param; + + int32_t refCount = atomic_sub_fetch_32(&pPeer->refCount, 1); + sDebug("%s, peer is freed, refCount:%d", pPeer->id, refCount); + + syncReleaseNode(pPeer->pSyncNode); + tfree(pPeer); +} + +SSyncPeer *syncAcquirePeer(int64_t rid) { + SSyncPeer *pPeer = taosAcquireRef(tsPeerRefId, rid); + if (pPeer == NULL) { + sDebug("failed to acquire peer from refId:%" PRId64, rid); + } else { + int32_t refCount = atomic_add_fetch_32(&pPeer->refCount, 1); + sTrace("%s, acquire peer refId:%" PRId64 ", refCount:%d", pPeer->id, rid, refCount); } - return 1; + return pPeer; +} + +void syncReleasePeer(SSyncPeer *pPeer) { + int32_t refCount = atomic_sub_fetch_32(&pPeer->refCount, 1); + sTrace("%s, dec peer refId:%" PRId64 ", refCount:%d", pPeer->id, pPeer->rid, refCount); + + taosReleaseRef(tsPeerRefId, pPeer->rid); } static void syncClosePeerConn(SSyncPeer *pPeer) { @@ -482,7 +535,8 @@ static void syncRemovePeer(SSyncPeer *pPeer) { pPeer->ip = 0; syncClosePeerConn(pPeer); - syncDecPeerRef(pPeer); + //taosRemoveRef(tsPeerRefId, pPeer->rid); + syncReleasePeer(pPeer); } static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { @@ -508,17 +562,18 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { pPeer->role = TAOS_SYNC_ROLE_OFFLINE; pPeer->pSyncNode = pNode; pPeer->refCount = 1; + pPeer->rid = taosAddRef(tsPeerRefId, pPeer); - sInfo("%s, it is configured, ep:%s:%u", pPeer->id, pPeer->fqdn, pPeer->port); + sInfo("%s, %p it is configured, ep:%s:%u rid:%" PRId64, pPeer->id, pPeer, pPeer->fqdn, pPeer->port, pPeer->rid); int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) { int32_t checkMs = 100 + (pNode->vgId * 10) % 100; if (pNode->vgId > 1) checkMs = tsStatusInterval * 1000 + checkMs; sDebug("%s, check peer connection after %d ms", pPeer->id, checkMs); - taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, tsSyncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, checkMs, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer); } - taosAcquireRef(tsSyncRefId, pNode->rid); + (void)syncAcquireNode(pNode->rid); return pPeer; } @@ -751,7 +806,7 @@ static void syncRestartPeer(SSyncPeer *pPeer) { int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) { sDebug("%s, check peer connection in 1000 ms", pPeer->id); - taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer); } } @@ -780,25 +835,30 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) { } // start a new thread to retrieve the data - syncAddPeerRef(pPeer); + (void)syncAcquirePeer(pPeer->rid); + pthread_attr_t thattr; pthread_t thread; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); - int32_t ret = pthread_create(&thread, &thattr, syncRetrieveData, pPeer); + int32_t ret = pthread_create(&thread, &thattr, syncRetrieveData, (void *)pPeer->rid); pthread_attr_destroy(&thattr); if (ret != 0) { sError("%s, failed to create sync thread since %s", pPeer->id, strerror(errno)); - syncDecPeerRef(pPeer); } else { pPeer->sstatus = TAOS_SYNC_STATUS_START; sDebug("%s, thread is created to retrieve data, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); } + + syncReleasePeer(pPeer); } static void syncNotStarted(void *param, void *tmrId) { - SSyncPeer *pPeer = param; + int64_t rid = (int64_t)param; + SSyncPeer *pPeer = syncAcquirePeer(rid); + if (pPeer == NULL) return; + SSyncNode *pNode = pPeer->pSyncNode; pthread_mutex_lock(&pNode->mutex); @@ -807,15 +867,22 @@ static void syncNotStarted(void *param, void *tmrId) { sInfo("%s, sync conn is still not up, restart and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); syncRestartConnection(pPeer); pthread_mutex_unlock(&pNode->mutex); + + syncReleasePeer(pPeer); } static void syncTryRecoverFromMaster(void *param, void *tmrId) { - SSyncPeer *pPeer = param; + int64_t rid = (int64_t)param; + SSyncPeer *pPeer = syncAcquirePeer(rid); + if (pPeer == NULL) return; + SSyncNode *pNode = pPeer->pSyncNode; pthread_mutex_lock(&pNode->mutex); syncRecoverFromMaster(pPeer); pthread_mutex_unlock(&pNode->mutex); + + syncReleasePeer(pPeer); } static void syncRecoverFromMaster(SSyncPeer *pPeer) { @@ -831,7 +898,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { // Ensure the sync of mnode not interrupted if (pNode->vgId != 1 && tsSyncNum >= SYNC_MAX_NUM) { sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum); - taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, pPeer, tsSyncTmrCtrl, &pPeer->timer); + taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer); return; } @@ -840,7 +907,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { SSyncMsg msg; syncBuildSyncReqMsg(&msg, pNode->vgId); - taosTmrReset(syncNotStarted, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer); + taosTmrReset(syncNotStarted, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer); if (taosWriteMsg(pPeer->peerFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { sError("%s, failed to send sync-req to peer", pPeer->id); @@ -929,7 +996,10 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead) { } static int32_t syncProcessPeerMsg(void *param, void *buffer) { - SSyncPeer *pPeer = param; + int64_t rid = (int64_t)param; + SSyncPeer *pPeer = syncAcquirePeer(rid); + if (pPeer == NULL) return -1; + SSyncHead *pHead = buffer; SSyncNode *pNode = pPeer->pSyncNode; @@ -950,6 +1020,7 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) { } pthread_mutex_unlock(&pNode->mutex); + syncReleasePeer(pPeer); return code; } @@ -1003,7 +1074,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); if (connFd < 0) { sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno)); - taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer); return; } @@ -1015,16 +1086,18 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { pPeer->peerFd = connFd; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd); - syncAddPeerRef(pPeer); } else { sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno)); taosClose(connFd); - taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer); } } static void syncCheckPeerConnection(void *param, void *tmrId) { - SSyncPeer *pPeer = param; + int64_t rid = (int64_t)param; + SSyncPeer *pPeer = syncAcquirePeer(rid); + if (pPeer == NULL) return; + SSyncNode *pNode = pPeer->pSyncNode; pthread_mutex_lock(&pNode->mutex); @@ -1033,6 +1106,8 @@ static void syncCheckPeerConnection(void *param, void *tmrId) { syncSetupPeerConnection(pPeer); pthread_mutex_unlock(&pNode->mutex); + + syncReleasePeer(pPeer); } static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { @@ -1043,8 +1118,9 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); - syncAddPeerRef(pPeer); - int32_t ret = pthread_create(&(thread), &thattr, (void *)syncRestoreData, pPeer); + (void)syncAcquirePeer(pPeer->rid); + + int32_t ret = pthread_create(&(thread), &thattr, (void *)syncRestoreData, (void *)pPeer->rid); pthread_attr_destroy(&thattr); if (ret < 0) { @@ -1052,10 +1128,11 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { nodeSStatus = TAOS_SYNC_STATUS_INIT; sError("%s, failed to create sync thread, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); taosClose(pPeer->syncFd); - syncDecPeerRef(pPeer); } else { sInfo("%s, sync connection is up", pPeer->id); } + + syncReleasePeer(pPeer); } static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { @@ -1087,7 +1164,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { return; } - sDebug("vgId:%d, sync msg is received, tranId:%u", vgId, msg.tranId); + sDebug("vgId:%d, sync connection is incomming, tranId:%u", vgId, msg.tranId); SSyncNode *pNode = *ppNode; pthread_mutex_lock(&pNode->mutex); @@ -1116,7 +1193,6 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { syncClosePeerConn(pPeer); pPeer->peerFd = connFd; pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd); - syncAddPeerRef(pPeer); sDebug("%s, ready to exchange data", pPeer->id); syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId()); } @@ -1126,22 +1202,21 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { } static void syncProcessBrokenLink(void *param) { - if (param == NULL) return; // the connection for arbitrator - SSyncPeer *pPeer = param; + int64_t rid = (int64_t)param; + SSyncPeer *pPeer = syncAcquirePeer(rid); + if (pPeer == NULL) return; + SSyncNode *pNode = pPeer->pSyncNode; - if (taosAcquireRef(tsSyncRefId, pNode->rid) == NULL) return; pthread_mutex_lock(&pNode->mutex); sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d", pPeer->id, strerror(errno), pPeer->peerFd, pPeer->syncFd); pPeer->peerFd = -1; - if (syncDecPeerRef(pPeer) != 0) { - syncRestartConnection(pPeer); - } - + syncRestartConnection(pPeer); pthread_mutex_unlock(&pNode->mutex); - taosReleaseRef(tsSyncRefId, pNode->rid); + + syncReleasePeer(pPeer); } static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { @@ -1212,7 +1287,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code static void syncMonitorNodeRole(void *param, void *tmrId) { int64_t rid = (int64_t)param; - SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + SSyncNode *pNode = syncAcquireNode(rid); if (pNode == NULL) return; for (int32_t index = 0; index < pNode->replica; index++) { @@ -1229,12 +1304,12 @@ static void syncMonitorNodeRole(void *param, void *tmrId) { } pNode->pRoleTimer = taosTmrStart(syncMonitorNodeRole, SYNC_ROLE_TIMER, (void *)pNode->rid, tsSyncTmrCtrl); - taosReleaseRef(tsSyncRefId, rid); + syncReleaseNode(pNode); } static void syncMonitorFwdInfos(void *param, void *tmrId) { int64_t rid = (int64_t)param; - SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid); + SSyncNode *pNode = syncAcquireNode(rid); if (pNode == NULL) return; SSyncFwds *pSyncFwds = pNode->pSyncFwds; @@ -1260,7 +1335,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl); } - taosReleaseRef(tsSyncRefId, rid); + syncReleaseNode(pNode); } static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype) { diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index e4e6927387..e815594883 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -351,7 +351,10 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { } void *syncRestoreData(void *param) { - SSyncPeer *pPeer = param; + int64_t rid = (int64_t)param; + SSyncPeer *pPeer = syncAcquirePeer(rid); + if (pPeer == NULL) return NULL; + SSyncNode *pNode = pPeer->pSyncNode; taosBlockSIGPIPE(); @@ -382,7 +385,7 @@ void *syncRestoreData(void *param) { taosClose(pPeer->syncFd); syncCloseRecvBuffer(pNode); __sync_fetch_and_sub(&tsSyncNum, 1); - syncDecPeerRef(pPeer); + syncReleasePeer(pPeer); return NULL; } diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 6a70835c31..f3e0a6d353 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -464,7 +464,10 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { } void *syncRetrieveData(void *param) { - SSyncPeer *pPeer = (SSyncPeer *)param; + int64_t rid = (int64_t)param; + SSyncPeer *pPeer = syncAcquirePeer(rid); + if (pPeer == NULL) return NULL; + SSyncNode *pNode = pPeer->pSyncNode; taosBlockSIGPIPE(); @@ -493,7 +496,7 @@ void *syncRetrieveData(void *param) { pPeer->fileChanged = 0; taosClose(pPeer->syncFd); - syncDecPeerRef(pPeer); + syncReleasePeer(pPeer); return NULL; } diff --git a/src/sync/src/syncTcp.c b/src/sync/src/syncTcp.c index 7bfdc4e440..fa8d486e40 100644 --- a/src/sync/src/syncTcp.c +++ b/src/sync/src/syncTcp.c @@ -130,7 +130,7 @@ void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { pConn->fd = connFd; pConn->pThread = pThread; - pConn->ahandle = pPeer; + pConn->ahandle = (void *)(((SSyncPeer *)pPeer)->rid); pConn->closedByApp = 0; event.events = EPOLLIN | EPOLLRDHUP; diff --git a/src/vnode/src/vnodeMgmt.c b/src/vnode/src/vnodeMgmt.c index ee5aa5ca90..5cae7b7606 100644 --- a/src/vnode/src/vnodeMgmt.c +++ b/src/vnode/src/vnodeMgmt.c @@ -89,7 +89,10 @@ static void vnodeIncRef(void *ptNode) { } void *vnodeAcquire(int32_t vgId) { - SVnodeObj **ppVnode = taosHashGetCB(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *)); + SVnodeObj **ppVnode = NULL; + if (tsVnodesHash != NULL) { + ppVnode = taosHashGetCB(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *)); + } if (ppVnode == NULL || *ppVnode == NULL) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; -- GitLab