diff --git a/src/inc/query.h b/src/inc/query.h index c648270b21cbf906b21e1f43343e4f9cf6864be3..d201b649f9de6aa55e0b9f1e0aa8c9aff11a6092 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -20,7 +20,6 @@ extern "C" { #endif typedef void* qinfo_t; -typedef void (*_qinfo_free_fn_t)(void*); /** * create the qinfo object according to QueryTableMsg @@ -29,13 +28,8 @@ typedef void (*_qinfo_free_fn_t)(void*); * @param qinfo * @return */ -int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, void* param, _qinfo_free_fn_t fn, qinfo_t* qinfo); +int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, void* param, qinfo_t* qinfo); -/** - * Destroy QInfo object - * @param qinfo qhandle - */ -void qDestroyQueryInfo(qinfo_t qinfo); /** * the main query execution function, including query on both table and multitables, @@ -84,8 +78,14 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo); */ int32_t qKillQuery(qinfo_t qinfo); +/** + * destroy query info structure + * @param qHandle + */ +void qDestroyQueryInfo(qinfo_t qHandle); + void* qOpenQueryMgmt(int32_t vgId); -void qSetQueryMgmtClosed(void* pExecutor); +void qQueryMgmtNotifyClosed(void* pExecutor); void qCleanupQueryMgmt(void* pExecutor); void** qRegisterQInfo(void* pMgmt, uint64_t qInfo); void** qAcquireQInfo(void* pMgmt, uint64_t key); diff --git a/src/util/inc/tcache.h b/src/util/inc/tcache.h index 2982b8dc706e160eaf1aa2ab432c842dd68e351a..3da604d15298ffeec085c5efcfc3cc8ed856fdea 100644 --- a/src/util/inc/tcache.h +++ b/src/util/inc/tcache.h @@ -33,17 +33,20 @@ typedef struct SCacheStatis { int64_t refreshCount; } SCacheStatis; +struct STrashElem; + typedef struct SCacheDataNode { - uint64_t addedTime; // the added time when this element is added or updated into cache - uint64_t lifespan; // expiredTime expiredTime when this element should be remove from cache - uint64_t signature; - uint32_t size; // allocated size for current SCacheDataNode + uint64_t addedTime; // the added time when this element is added or updated into cache + uint64_t lifespan; // life duration when this element should be remove from cache + uint64_t expireTime; // expire time + uint64_t signature; + struct STrashElem *pTNodeHeader; // point to trash node head + uint16_t keySize: 15; // max key size: 32kb + bool inTrashCan: 1;// denote if it is in trash or not + uint32_t size; // allocated size for current SCacheDataNode T_REF_DECLARE() - uint16_t keySize: 15; // max key size: 32kb - bool inTrashCan: 1;// denote if it is in trash or not - int32_t extendFactor; // number of life span extend - char *key; - char data[]; + char *key; + char data[]; } SCacheDataNode; typedef struct STrashElem { diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 688e49a40b9d03b9521396e234a6fa0d68d6fce1..d3c622633de7cdebfe43a7492f7a882823dec0e2 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -116,11 +116,13 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo return; } - int32_t size = pNode->size; taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize); + pCacheObj->totalSize -= pNode->size; uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes", - pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, size); + pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, + pNode->size); + if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data); free(pNode); } @@ -285,7 +287,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes", - pCacheObj->name, key, pNode->data, pNode->addedTime, (pNode->lifespan * pNode->extendFactor + pNode->addedTime), + pCacheObj->name, key, pNode->data, pNode->addedTime, pNode->expireTime, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, dataSize); } else { uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key); @@ -312,16 +314,6 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen int32_t ref = 0; if (ptNode != NULL) { ref = T_REF_INC(*ptNode); - - // if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan - if (pCacheObj->extendLifespan) { - int64_t now = taosGetTimestampMs(); - - if ((now - (*ptNode)->addedTime) < (*ptNode)->lifespan * (*ptNode)->extendFactor) { - (*ptNode)->extendFactor += 1; - uDebug("key:%p extend life time to %"PRId64, key, (*ptNode)->lifespan * (*ptNode)->extendFactor + (*ptNode)->addedTime); - } - } } __cache_unlock(pCacheObj); @@ -347,8 +339,7 @@ void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t ke SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen); if (ptNode != NULL) { T_REF_INC(*ptNode); - (*ptNode)->extendFactor += 1; -// (*ptNode)->lifespan = expireTime; + (*ptNode)->expireTime = taosGetTimestampMs() + (*ptNode)->lifespan; } __cache_unlock(pCacheObj); @@ -380,17 +371,6 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { int32_t ref = T_REF_INC(ptNode); uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref); - // if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan - if (pCacheObj->extendLifespan) { - int64_t now = taosGetTimestampMs(); - - if ((now - ptNode->addedTime) < ptNode->lifespan * ptNode->extendFactor) { - ptNode->extendFactor += 1; - uDebug("cache:%s, %p extend life time to %" PRId64, pCacheObj->name, ptNode->data, - ptNode->lifespan * ptNode->extendFactor + ptNode->addedTime); - } - } - // the data if referenced by at least one object, so the reference count must be greater than the value of 2. assert(ref >= 2); return data; @@ -431,22 +411,58 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { } *data = NULL; - int16_t ref = T_REF_DEC(pNode); - uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref); - if (_remove && (!pNode->inTrashCan)) { - __cache_wr_lock(pCacheObj); + // note: extend lifespan before dec ref count + if (pCacheObj->extendLifespan) { + atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs()); + uDebug("cache:%s data:%p extend life time to %"PRId64 " before release", pCacheObj->name, pNode->data, pNode->expireTime); + } + + bool inTrashCan = pNode->inTrashCan; + uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, T_REF_VAL_GET(pNode) - 1); - if (T_REF_VAL_GET(pNode) == 0) { - // remove directly, if not referenced by other users - taosCacheReleaseNode(pCacheObj, pNode); - } else { - // pNode may be released immediately by other thread after the reference count of pNode is set to 0, - // So we need to lock it in the first place. - taosCacheMoveToTrash(pCacheObj, pNode); + // NOTE: once refcount is decrease, pNode may be free by other thread immediately. + int32_t ref = T_REF_DEC(pNode); + + if (inTrashCan) { + // Remove it if the ref count is 0. + // The ref count does not need to load and check again after lock acquired, since ref count can not be increased when + // the node is in trashcan. + if (ref == 0) { + __cache_wr_lock(pCacheObj); + assert(pNode->pTNodeHeader->pData == pNode); + taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader); + __cache_unlock(pCacheObj); } - __cache_unlock(pCacheObj); + } else { + assert(pNode->pTNodeHeader == NULL); + + if (_remove) { // not in trash can, but need to remove it + __cache_wr_lock(pCacheObj); + + /* + * If not referenced by other users. Otherwise move this node to trashcan wait for all users + * releasing this resources. + * + * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread + * that tries to do the same thing. + */ + if (ref == 0) { + if (T_REF_VAL_GET(pNode) == 0) { + taosCacheReleaseNode(pCacheObj, pNode); + } else { + taosCacheMoveToTrash(pCacheObj, pNode); + } + } + + __cache_unlock(pCacheObj); +// } else { // extend its life time +// if (pCacheObj->extendLifespan) { +// atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs()); +// uDebug("cache:%s data:%p extend life time to %"PRId64 " after release", pCacheObj->name, pNode->data, pNode->expireTime); +// } + } } } @@ -486,7 +502,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) { SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration) { - size_t totalSize = size + sizeof(SCacheDataNode) + keyLen + 1; + size_t totalSize = size + sizeof(SCacheDataNode) + keyLen; SCacheDataNode *pNewNode = calloc(1, totalSize); if (pNewNode == NULL) { @@ -503,7 +519,7 @@ SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char * pNewNode->addedTime = (uint64_t)taosGetTimestampMs(); pNewNode->lifespan = duration; - pNewNode->extendFactor = 1; + pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan; pNewNode->signature = (uint64_t)pNewNode; pNewNode->size = (uint32_t)totalSize; @@ -512,6 +528,7 @@ SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char * void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) { if (pNode->inTrashCan) { /* node is already in trash */ + assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode); return; } @@ -527,6 +544,7 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) { pCacheObj->pTrash = pElem; pNode->inTrashCan = true; + pNode->pTNodeHeader = pElem; pCacheObj->numOfElemsInTrash++; uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode->data, pCacheObj->numOfElemsInTrash); @@ -629,7 +647,7 @@ static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t __cache_wr_lock(pCacheObj); while (taosHashIterNext(pIter)) { SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); - if ((pNode->addedTime + pNode->lifespan * pNode->extendFactor) <= time && T_REF_VAL_GET(pNode) <= 0) { + if (pNode->expireTime < time && T_REF_VAL_GET(pNode) <= 0) { taosCacheReleaseNode(pCacheObj, pNode); continue; } diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 18c9ebf2e152017f178f2e1cf548f18caebb9e5f..d5221bae10b9a770ff3e1b05722a38bdb3a6a73d 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -508,7 +508,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount); // release local resources only after cutting off outside connections - qSetQueryMgmtClosed(pVnode->qMgmt); + qQueryMgmtNotifyClosed(pVnode->qMgmt); vnodeRelease(pVnode); } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index f054ae390410dc0041825cc8c421b0664dba727d..ff58e219b07fba049a36b14fc97446010499321d 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -82,6 +82,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); } else { assert(*qhandle == (void*) killQueryMsg->qhandle); + qKillQuery(*qhandle); qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true); } @@ -93,7 +94,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (contLen != 0) { qinfo_t pQInfo = NULL; - code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, NULL, &pQInfo); + code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, &pQInfo); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); pRsp->code = code; @@ -108,9 +109,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo); if (handle == NULL) { // failed to register qhandle pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE; - - qKillQuery(pQInfo); - qKillQuery(pQInfo); + qDestroyQueryInfo(pQInfo); // destroy it directly } else { assert(*handle == pQInfo); pRsp->qhandle = htobe64((uint64_t) pQInfo); @@ -120,10 +119,6 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - - // NOTE: there two refcount, needs to kill twice - // query has not been put into qhandle pool, kill it directly. - qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); return pRsp->code; } @@ -134,6 +129,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { dnodePutItemIntoReadQueue(pVnode, *handle); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); } + vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", vgId, pQInfo); } else { assert(pCont != NULL); @@ -183,6 +179,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (pRetrieve->free == 1) { vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); + qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); @@ -209,6 +206,9 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { dnodePutItemIntoReadQueue(pVnode, *handle); pRet->qhandle = *handle; freeHandle = false; + } else { + qKillQuery(*handle); + freeHandle = true; } } }