提交 ebb7afd6 编写于 作者: S Shengliang Guan

TD-1746

上级 73db12c7
...@@ -114,13 +114,21 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext ...@@ -114,13 +114,21 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int durationMS); void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int durationMS);
/** /**
* get data from cache * get data from cache, add ref count
* @param pCacheObj cache object * @param pCacheObj cache object
* @param key key * @param key key
* @return cached data or NULL * @return cached data or NULL
*/ */
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen); void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen);
/**
* get data from cache, keep ref not changed
* @param pCacheObj cache object
* @param key key
* @return cached data or NULL
*/
void *taosCacheGetByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen);
/** /**
* Add one reference count for the exist data, and assign this data for a new owner. * Add one reference count for the exist data, and assign this data for a new owner.
* The new owner needs to invoke the taosCacheRelease when it does not need this data anymore. * The new owner needs to invoke the taosCacheRelease when it does not need this data anymore.
......
...@@ -267,7 +267,7 @@ static void incRefFn(void* ptNode) { ...@@ -267,7 +267,7 @@ static void incRefFn(void* ptNode) {
assert(ret > 0); assert(ret > 0);
} }
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) { static void *taosCacheAcquireByKeyImp(SCacheObj *pCacheObj, const void *key, size_t keyLen, bool changeRef) {
if (pCacheObj == NULL || pCacheObj->deleting == 1) { if (pCacheObj == NULL || pCacheObj->deleting == 1) {
return NULL; return NULL;
} }
...@@ -283,10 +283,10 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen ...@@ -283,10 +283,10 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
void* pData = (ptNode != NULL)? ptNode->data:NULL; void* pData = (ptNode != NULL)? ptNode->data:NULL;
if (pData != NULL) { if (pData != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); if (changeRef) atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, T_REF_VAL_GET(ptNode)); uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, T_REF_VAL_GET(ptNode));
} else { } else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); if (changeRef) atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key); uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
} }
...@@ -294,6 +294,14 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen ...@@ -294,6 +294,14 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
return pData; return pData;
} }
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
return taosCacheAcquireByKeyImp(pCacheObj, key, keyLen, true);
}
void *taosCacheGetByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
return taosCacheAcquireByKeyImp(pCacheObj, key, keyLen, false);
}
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
if (pCacheObj == NULL || data == NULL) return NULL; if (pCacheObj == NULL || data == NULL) return NULL;
......
...@@ -236,10 +236,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -236,10 +236,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->accessState = TSDB_VN_ALL_ACCCESS;
tsem_init(&pVnode->sem, 0, 0); tsem_init(&pVnode->sem, 0, 0);
pVnode->ppVnode = taosCachePut(tsDnodeVnodesCache, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *), 8);
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p data:%p", pVnode->vgId, rootDir, pVnode, pVnode->ppVnode);
assert(pVnode->ppVnode != NULL);
int32_t code = vnodeReadCfg(pVnode); int32_t code = vnodeReadCfg(pVnode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
...@@ -342,6 +338,10 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -342,6 +338,10 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->events = NULL; pVnode->events = NULL;
pVnode->status = TAOS_VN_STATUS_READY; pVnode->status = TAOS_VN_STATUS_READY;
pVnode->ppVnode = taosCachePut(tsDnodeVnodesCache, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *), 8);
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p data:%p", pVnode->vgId, rootDir, pVnode, pVnode->ppVnode);
assert(pVnode->ppVnode != NULL);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -361,11 +361,8 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -361,11 +361,8 @@ void vnodeRelease(void *pVnodeRaw) {
int32_t vgId = pVnode->vgId; int32_t vgId = pVnode->vgId;
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
assert(refCount >= 0);
void **ppVnode = pVnode->ppVnode;
taosCacheRelease(tsDnodeVnodesCache, (void **)(&ppVnode), false);
vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p data:%p", vgId, refCount, pVnode, pVnode->ppVnode); vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p data:%p", vgId, refCount, pVnode, pVnode->ppVnode);
assert(refCount >= 0);
if (refCount > 0) { if (refCount > 0) {
if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) { if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) {
...@@ -374,6 +371,11 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -374,6 +371,11 @@ void vnodeRelease(void *pVnodeRaw) {
return; return;
} }
void **ppVnode = pVnode->ppVnode;
if (ppVnode != NULL) {
taosCacheRelease(tsDnodeVnodesCache, (void **)(&ppVnode), false);
}
if (pVnode->qMgmt) { if (pVnode->qMgmt) {
qCleanupQueryMgmt(pVnode->qMgmt); qCleanupQueryMgmt(pVnode->qMgmt);
pVnode->qMgmt = NULL; pVnode->qMgmt = NULL;
...@@ -433,7 +435,7 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -433,7 +435,7 @@ void vnodeRelease(void *pVnodeRaw) {
} }
void *vnodeAcquire(int32_t vgId) { void *vnodeAcquire(int32_t vgId) {
SVnodeObj **ppVnode = taosCacheAcquireByKey(tsDnodeVnodesCache, &vgId, sizeof(int32_t)); SVnodeObj **ppVnode = taosCacheGetByKey(tsDnodeVnodesCache, &vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) { if (ppVnode == NULL || *ppVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
...@@ -442,6 +444,12 @@ void *vnodeAcquire(int32_t vgId) { ...@@ -442,6 +444,12 @@ void *vnodeAcquire(int32_t vgId) {
} }
SVnodeObj *pVnode = *ppVnode; SVnodeObj *pVnode = *ppVnode;
if (pVnode->refCount <= 0) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
vDebug("vgId:%d, not exist for refCount is %d", vgId, pVnode->refCount);
return NULL;
}
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode, pVnode->ppVnode); vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode, pVnode->ppVnode);
......
...@@ -74,7 +74,7 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) { ...@@ -74,7 +74,7 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) {
pRead->contLen = 0; pRead->contLen = 0;
pRead->rpcMsg.handle = NULL; pRead->rpcMsg.handle = NULL;
vnodeAcquire(pVnode->vgId); atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead); vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead);
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
......
...@@ -201,7 +201,7 @@ int vnodeWriteCqMsgToQueue(void *param, void *data, int type) { ...@@ -201,7 +201,7 @@ int vnodeWriteCqMsgToQueue(void *param, void *data, int type) {
SWalHead *pWal = (SWalHead *)(pSync + 1); SWalHead *pWal = (SWalHead *)(pSync + 1);
memcpy(pWal, pHead, size); memcpy(pWal, pHead, size);
vnodeAcquire(pVnode->vgId); atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("CQ: vgId:%d, get vnode wqueue, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode, vTrace("CQ: vgId:%d, get vnode wqueue, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode,
pVnode->ppVnode); pVnode->ppVnode);
...@@ -219,7 +219,7 @@ int vnodeWriteToQueue(void *param, void *data, int type) { ...@@ -219,7 +219,7 @@ int vnodeWriteToQueue(void *param, void *data, int type) {
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
memcpy(pWal, pHead, size); memcpy(pWal, pHead, size);
vnodeAcquire(pVnode->vgId); atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode, vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode,
pVnode->ppVnode); pVnode->ppVnode);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册