diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 1a3d0ebc27e432d5038301908514f941cc0dfb18..654e35cad54c106d450871bcef653a18ac448b55 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -189,7 +189,6 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) { rpcFreeCont(pRead->rpcMsg.pCont); vnodeRelease(pVnode); - return; } static void *dnodeProcessReadQueue(void *param) { diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index fc76e3dce39dc2f6eeee3aac19d13f2dc605c1a1..f8f99e22c6f437b1eece0d704c8c4551bc434110 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -111,7 +111,6 @@ void mnodeReleaseConn(SConnObj *pConn) { } SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port) { - uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs(); SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t)); if (pConn == NULL) { mDebug("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); @@ -126,7 +125,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po } // mDebug("connId:%d, is incoming, user:%s ip:%s:%u", connId, pConn->user, taosIpStr(pConn->ip), pConn->port); - pConn->lastAccess = expireTime; + pConn->lastAccess = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs(); return pConn; } @@ -626,7 +625,7 @@ static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) { SCMKillConnMsg *pKill = pMsg->rpcMsg.pCont; int32_t connId = atoi(pKill->queryId); - SConnObj * pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t)); + SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t)); if (pConn == NULL) { mError("connId:%s, failed to kill, conn not exist", pKill->queryId); return TSDB_CODE_MND_INVALID_CONN_ID; diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 169334c6119a35f53fd0a69dd1f7cb6952fbb727..327800c549ca3b92135de3cb803f449259b91b4f 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -50,6 +50,7 @@ typedef struct { void *sync; void *events; void *cq; // continuous query + void **ppVnode; int32_t cfgVersion; STsdbCfg tsdbCfg; SSyncCfg syncCfg; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index a9bcf948b6cb00c25a0b283a0cb63126925abd8c..222c89f8e584b2c3e5b835a0cc29171aa3c10a82 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -34,7 +34,7 @@ #define TSDB_VNODE_VERSION_CONTENT_LEN 31 -static SHashObj*tsDnodeVnodesHash; +static SCacheObj* tsDnodeVnodesCache; static void vnodeCleanUp(SVnodeObj *pVnode); static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); static int32_t vnodeReadCfg(SVnodeObj *pVnode); @@ -46,6 +46,7 @@ static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeCtrlFlow(void *handle, int32_t mseconds); static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); +static void vnodeFreeVnodeObj(void *data); #ifndef _SYNC tsync_h syncStart(const SSyncInfo *info) { return NULL; } @@ -63,9 +64,9 @@ int32_t vnodeInitResources() { vnodeInitWriteFp(); vnodeInitReadFp(); - tsDnodeVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true); - if (tsDnodeVnodesHash == NULL) { - vError("failed to init vnode list"); + tsDnodeVnodesCache = taosCacheInit(TSDB_DATA_TYPE_INT, 5, true, vnodeFreeVnodeObj, "vnode"); + if (tsDnodeVnodesCache == NULL) { + vError("failed to init vnode cache"); return TSDB_CODE_VND_OUT_OF_MEMORY; } @@ -73,9 +74,10 @@ int32_t vnodeInitResources() { } void vnodeCleanupResources() { - if (tsDnodeVnodesHash != NULL) { - taosHashCleanup(tsDnodeVnodesHash); - tsDnodeVnodesHash = NULL; + if (tsDnodeVnodesCache != NULL) { + vDebug("vnode cache is cleanup"); + taosCacheCleanup(tsDnodeVnodesCache); + tsDnodeVnodesCache = NULL; } syncCleanUp(); @@ -84,9 +86,10 @@ void vnodeCleanupResources() { int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { int32_t code; - SVnodeObj *pTemp = (SVnodeObj *)taosHashGet(tsDnodeVnodesHash, (const char *)&pVnodeCfg->cfg.vgId, sizeof(int32_t)); - if (pTemp != NULL) { - vInfo("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp); + SVnodeObj *pVnode = vnodeAcquire(pVnodeCfg->cfg.vgId); + if (pVnode != NULL) { + vDebug("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pVnode); + vnodeRelease(pVnode); return TSDB_CODE_SUCCESS; } @@ -143,22 +146,24 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { return TSDB_CODE_VND_INIT_FAILED; } - vInfo("vgId:%d, vnode is created, walLevel:%d fsyncPeriod:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel, pVnodeCfg->cfg.fsyncPeriod); + vInfo("vgId:%d, vnode dir is created, walLevel:%d fsyncPeriod:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel, pVnodeCfg->cfg.fsyncPeriod); code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir); return code; } int32_t vnodeDrop(int32_t vgId) { - SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); - if (ppVnode == NULL || *ppVnode == NULL) { - vDebug("vgId:%d, failed to drop, vgId not find", vgId); + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vDebug("vgId:%d, failed to drop, vnode not find", vgId); return TSDB_CODE_VND_INVALID_VGROUP_ID; } - SVnodeObj *pVnode = *ppVnode; - vTrace("vgId:%d, vnode will be dropped, refCount:%d", pVnode->vgId, pVnode->refCount); + vInfo("vgId:%d, vnode will be dropped, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode, + pVnode->ppVnode); pVnode->dropped = 1; + + vnodeRelease(pVnode); vnodeCleanUp(pVnode); return TSDB_CODE_SUCCESS; @@ -231,6 +236,10 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->accessState = TSDB_VN_ALL_ACCCESS; tsem_init(&pVnode->sem, 0, 0); + pVnode->ppVnode = taosCachePut(tsDnodeVnodesCache, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *), 8); + vDebug("vgId:%d, vnode is opened in %s, pVnode:%p data:%p", pVnode->vgId, rootDir, pVnode, pVnode->ppVnode); + assert(pVnode->ppVnode != NULL); + int32_t code = vnodeReadCfg(pVnode); if (code != TSDB_CODE_SUCCESS) { vnodeCleanUp(pVnode); @@ -332,19 +341,16 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->events = NULL; pVnode->status = TAOS_VN_STATUS_READY; - vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); - - taosHashPut(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *)); return TSDB_CODE_SUCCESS; } int32_t vnodeClose(int32_t vgId) { - SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); - if (ppVnode == NULL || *ppVnode == NULL) return 0; + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) return 0; - SVnodeObj *pVnode = *ppVnode; - vDebug("vgId:%d, vnode will be closed", pVnode->vgId); + vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode); + vnodeRelease(pVnode); vnodeCleanUp(pVnode); return 0; @@ -357,19 +363,26 @@ void vnodeRelease(void *pVnodeRaw) { int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); assert(refCount >= 0); + void **ppVnode = pVnode->ppVnode; + taosCacheRelease(tsDnodeVnodesCache, (void **)(&ppVnode), false); + vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p data:%p", vgId, refCount, pVnode, pVnode->ppVnode); + if (refCount > 0) { - vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount); - if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) + if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) { tsem_post(&pVnode->sem); + } return; } - qCleanupQueryMgmt(pVnode->qMgmt); - pVnode->qMgmt = NULL; + if (pVnode->qMgmt) { + qCleanupQueryMgmt(pVnode->qMgmt); + pVnode->qMgmt = NULL; + } - if (pVnode->tsdb) + if (pVnode->tsdb) { tsdbCloseRepo(pVnode->tsdb, 1); - pVnode->tsdb = NULL; + pVnode->tsdb = NULL; + } // stop continuous query if (pVnode->cq) { @@ -378,18 +391,21 @@ void vnodeRelease(void *pVnodeRaw) { cqClose(cq); } - if (pVnode->wal) + if (pVnode->wal) { walClose(pVnode->wal); - pVnode->wal = NULL; + pVnode->wal = NULL; + } - if (pVnode->wqueue) + if (pVnode->wqueue) { dnodeFreeVnodeWqueue(pVnode->wqueue); - pVnode->wqueue = NULL; + pVnode->wqueue = NULL; + } - if (pVnode->rqueue) + if (pVnode->rqueue) { dnodeFreeVnodeRqueue(pVnode->rqueue); - pVnode->rqueue = NULL; - + pVnode->rqueue = NULL; + } + taosTFree(pVnode->rootDir); if (pVnode->dropped) { @@ -412,21 +428,22 @@ void vnodeRelease(void *pVnodeRaw) { tsem_destroy(&pVnode->sem); free(pVnode); - int32_t count = taosHashGetSize(tsDnodeVnodesHash); + int32_t count = taosHashGetSize(tsDnodeVnodesCache->pHashTable); vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count); } void *vnodeAcquire(int32_t vgId) { - SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); + SVnodeObj **ppVnode = taosCacheAcquireByKey(tsDnodeVnodesCache, &vgId, sizeof(int32_t)); + if (ppVnode == NULL || *ppVnode == NULL) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; - vInfo("vgId:%d, not exist", vgId); + vDebug("vgId:%d, not exist", vgId); return NULL; } SVnodeObj *pVnode = *ppVnode; atomic_add_fetch_32(&pVnode->refCount, 1); - vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount); + vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode, pVnode->ppVnode); return pVnode; } @@ -487,7 +504,7 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) { } int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { - SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesHash); + SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesCache->pHashTable); while (taosHashIterNext(pIter)) { SVnodeObj **pVnode = taosHashIterGet(pIter); if (pVnode == NULL) continue; @@ -508,7 +525,7 @@ int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { void vnodeBuildStatusMsg(void *param) { SDMStatusMsg *pStatus = param; - SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesHash); + SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesCache->pHashTable); while (taosHashIterNext(pIter)) { SVnodeObj **pVnode = taosHashIterGet(pIter); @@ -528,7 +545,7 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { if (pVnode != NULL) { pVnode->accessState = pAccess[i].accessState; if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) { - vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState) + vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState); } vnodeRelease(pVnode); } @@ -537,12 +554,13 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { static void vnodeCleanUp(SVnodeObj *pVnode) { // remove from hash, so new messages wont be consumed - taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); - int i = 0; + // taosCacheRelease(tsDnodeVnodesCache, (void **)&pVnode->ppVnode, true); if (pVnode->status != TAOS_VN_STATUS_INIT) { // it may be in updateing or reset state, then it shall wait - while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != TAOS_VN_STATUS_READY) { + int i = 0; + while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != + TAOS_VN_STATUS_READY) { if (++i % 1000 == 0) { sched_yield(); } @@ -556,7 +574,8 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { syncStop(sync); } - vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount); + vDebug("vgId:%d, vnode will cleanup, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode, + pVnode->ppVnode); // release local resources only after cutting off outside connections qQueryMgmtNotifyClosed(pVnode->qMgmt); @@ -613,17 +632,19 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) char rootDir[128] = "\0"; sprintf(rootDir, "%s/tsdb", pVnode->rootDir); - if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY) + if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY) { return -1; + } void *tsdb = pVnode->tsdb; pVnode->tsdb = NULL; // acquire vnode - int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); - if (refCount > 2) + if (refCount > 2) { tsem_wait(&pVnode->sem); + } // close tsdb, then open tsdb tsdbCloseRepo(tsdb, 0); @@ -1008,3 +1029,9 @@ PARSE_OVER: if (fp) fclose(fp); return terrno; } + +static void vnodeFreeVnodeObj(void *data) { + SVnodeObj *pVnode = *(SVnodeObj **)data; + vDebug("vgId:%d, vnode is destroyed, pVnode:%p data:%p", pVnode->vgId, pVnode, pVnode->ppVnode); + //taosTFree(pVnode); +} \ No newline at end of file diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index baaeae2a81d28331454eb414106eabe1ef64d939..7d8fa1eea8ffd00f03382926c9116000a8febbfa 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -74,7 +74,7 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) { pRead->contLen = 0; pRead->rpcMsg.handle = NULL; - atomic_add_fetch_32(&pVnode->refCount, 1); + vnodeAcquire(pVnode->vgId); vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead); taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index c4924f312f4bf16b6c536d58dba75d92d1f75c85..02f1806fa553156280c1e56c3ceb529e47eed239 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -201,8 +201,9 @@ int vnodeWriteCqMsgToQueue(void *param, void *data, int type) { SWalHead *pWal = (SWalHead *)(pSync + 1); memcpy(pWal, pHead, size); - atomic_add_fetch_32(&pVnode->refCount, 1); - vDebug("CQ: vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount); + vnodeAcquire(pVnode->vgId); + vTrace("CQ: vgId:%d, get vnode wqueue, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode, + pVnode->ppVnode); taosWriteQitem(pVnode->wqueue, type, pSync); @@ -218,8 +219,9 @@ int vnodeWriteToQueue(void *param, void *data, int type) { SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); memcpy(pWal, pHead, size); - atomic_add_fetch_32(&pVnode->refCount, 1); - vDebug("vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount); + vnodeAcquire(pVnode->vgId); + vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode, + pVnode->ppVnode); taosWriteQitem(pVnode->wqueue, type, pWal);