diff --git a/src/util/inc/tcache.h b/src/util/inc/tcache.h index efd51f90ce8739050971856dd4f2dbdd1c44d5a4..d5930956f4e02446527f3bd8961f9dec9abb97aa 100644 --- a/src/util/inc/tcache.h +++ b/src/util/inc/tcache.h @@ -114,13 +114,21 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int durationMS); /** - * get data from cache + * get data from cache, add ref count * @param pCacheObj cache object * @param key key * @return cached data or NULL */ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen); +/** + * get data from cache, keep ref not changed + * @param pCacheObj cache object + * @param key key + * @return cached data or NULL + */ +void *taosCacheGetByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen); + /** * Add one reference count for the exist data, and assign this data for a new owner. * The new owner needs to invoke the taosCacheRelease when it does not need this data anymore. diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 6e20c1708dfc81728c6b961b9259d50e953b4b9d..a0204e77c8d06fc1bb2e62aa3a6c8223317dac57 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -267,7 +267,7 @@ static void incRefFn(void* ptNode) { assert(ret > 0); } -void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) { +static void *taosCacheAcquireByKeyImp(SCacheObj *pCacheObj, const void *key, size_t keyLen, bool changeRef) { if (pCacheObj == NULL || pCacheObj->deleting == 1) { return NULL; } @@ -283,10 +283,10 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen void* pData = (ptNode != NULL)? ptNode->data:NULL; if (pData != NULL) { - atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); + if (changeRef) atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, T_REF_VAL_GET(ptNode)); } else { - atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); + if (changeRef) atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key); } @@ -294,6 +294,14 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen return pData; } +void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) { + return taosCacheAcquireByKeyImp(pCacheObj, key, keyLen, true); +} + +void *taosCacheGetByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) { + return taosCacheAcquireByKeyImp(pCacheObj, key, keyLen, false); +} + void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { if (pCacheObj == NULL || data == NULL) return NULL; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 222c89f8e584b2c3e5b835a0cc29171aa3c10a82..da6dbcd4b36e8eba09488284c3a37670c85301d3 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -236,10 +236,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->accessState = TSDB_VN_ALL_ACCCESS; tsem_init(&pVnode->sem, 0, 0); - pVnode->ppVnode = taosCachePut(tsDnodeVnodesCache, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *), 8); - vDebug("vgId:%d, vnode is opened in %s, pVnode:%p data:%p", pVnode->vgId, rootDir, pVnode, pVnode->ppVnode); - assert(pVnode->ppVnode != NULL); - int32_t code = vnodeReadCfg(pVnode); if (code != TSDB_CODE_SUCCESS) { vnodeCleanUp(pVnode); @@ -342,6 +338,10 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->events = NULL; pVnode->status = TAOS_VN_STATUS_READY; + pVnode->ppVnode = taosCachePut(tsDnodeVnodesCache, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *), 8); + vDebug("vgId:%d, vnode is opened in %s, pVnode:%p data:%p", pVnode->vgId, rootDir, pVnode, pVnode->ppVnode); + assert(pVnode->ppVnode != NULL); + return TSDB_CODE_SUCCESS; } @@ -361,11 +361,8 @@ void vnodeRelease(void *pVnodeRaw) { int32_t vgId = pVnode->vgId; int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); - assert(refCount >= 0); - - void **ppVnode = pVnode->ppVnode; - taosCacheRelease(tsDnodeVnodesCache, (void **)(&ppVnode), false); vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p data:%p", vgId, refCount, pVnode, pVnode->ppVnode); + assert(refCount >= 0); if (refCount > 0) { if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) { @@ -374,6 +371,11 @@ void vnodeRelease(void *pVnodeRaw) { return; } + void **ppVnode = pVnode->ppVnode; + if (ppVnode != NULL) { + taosCacheRelease(tsDnodeVnodesCache, (void **)(&ppVnode), false); + } + if (pVnode->qMgmt) { qCleanupQueryMgmt(pVnode->qMgmt); pVnode->qMgmt = NULL; @@ -433,7 +435,7 @@ void vnodeRelease(void *pVnodeRaw) { } void *vnodeAcquire(int32_t vgId) { - SVnodeObj **ppVnode = taosCacheAcquireByKey(tsDnodeVnodesCache, &vgId, sizeof(int32_t)); + SVnodeObj **ppVnode = taosCacheGetByKey(tsDnodeVnodesCache, &vgId, sizeof(int32_t)); if (ppVnode == NULL || *ppVnode == NULL) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; @@ -442,6 +444,12 @@ void *vnodeAcquire(int32_t vgId) { } SVnodeObj *pVnode = *ppVnode; + if (pVnode->refCount <= 0) { + terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; + vDebug("vgId:%d, not exist for refCount is %d", vgId, pVnode->refCount); + return NULL; + } + atomic_add_fetch_32(&pVnode->refCount, 1); vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode, pVnode->ppVnode); diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 7d8fa1eea8ffd00f03382926c9116000a8febbfa..baaeae2a81d28331454eb414106eabe1ef64d939 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -74,7 +74,7 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) { pRead->contLen = 0; pRead->rpcMsg.handle = NULL; - vnodeAcquire(pVnode->vgId); + atomic_add_fetch_32(&pVnode->refCount, 1); vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead); taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 02f1806fa553156280c1e56c3ceb529e47eed239..07d69925d9cb92ffef4a2387547686f2a94a6544 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -201,7 +201,7 @@ int vnodeWriteCqMsgToQueue(void *param, void *data, int type) { SWalHead *pWal = (SWalHead *)(pSync + 1); memcpy(pWal, pHead, size); - vnodeAcquire(pVnode->vgId); + atomic_add_fetch_32(&pVnode->refCount, 1); vTrace("CQ: vgId:%d, get vnode wqueue, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode, pVnode->ppVnode); @@ -219,7 +219,7 @@ int vnodeWriteToQueue(void *param, void *data, int type) { SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); memcpy(pWal, pHead, size); - vnodeAcquire(pVnode->vgId); + atomic_add_fetch_32(&pVnode->refCount, 1); vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode, pVnode->ppVnode);