diff --git a/src/inc/query.h b/src/inc/query.h index ec1e458b628a60f0db86a48e9da9af7257454ed2..0c18f85dc31bae5e77bae7228d5390a8d32df07a 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -76,6 +76,9 @@ void* qGetResultRetrieveMsg(qinfo_t qinfo); */ int32_t qKillQuery(qinfo_t qinfo); +int32_t qQueryCompleted(qinfo_t qinfo); + + /** * destroy query info structure * @param qHandle diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index b5fc67e4f17a5aa513409043f2356c21275ce844..0ffb1a4cde76b7a6326ef4f54f26aea2d662d7d9 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6432,34 +6432,6 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex return code; } -bool qHasMoreResultsToRetrieve(qinfo_t qinfo) { - SQInfo *pQInfo = (SQInfo *)qinfo; - - if (!isValidQInfo(pQInfo) || pQInfo->code != TSDB_CODE_SUCCESS) { - qDebug("QInfo:%p invalid qhandle or error occurs, abort query, code:%x", pQInfo, pQInfo->code); - return false; - } - - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - - bool ret = false; - if (Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { - ret = false; - } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { - ret = true; - } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - ret = true; - } else { - assert(0); - } - - if (ret) { - qDebug("QInfo:%p has more results waits for client retrieve", pQInfo); - } - - return ret; -} - int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen, bool* continueExec) { SQInfo *pQInfo = (SQInfo *)qinfo; @@ -6487,11 +6459,11 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co int32_t code = pQInfo->code; if (code == TSDB_CODE_SUCCESS) { - (*pRsp)->offset = htobe64(pQuery->limit.offset); + (*pRsp)->offset = htobe64(pQuery->limit.offset); (*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime); } else { - (*pRsp)->useconds = 0; - (*pRsp)->offset = 0; + (*pRsp)->offset = 0; + (*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime); } (*pRsp)->precision = htons(pQuery->precision); @@ -6503,22 +6475,30 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co } pQInfo->rspContext = NULL; - pQInfo->dataReady = QUERY_RESULT_NOT_READY; + pQInfo->dataReady = QUERY_RESULT_NOT_READY; if (IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { + *continueExec = false; (*pRsp)->completed = 1; // notify no more result to client - } - - if (qHasMoreResultsToRetrieve(pQInfo)) { + } else { *continueExec = true; - } else { // failed to dump result, free qhandle immediately - *continueExec = false; - qKillQuery(pQInfo); + qDebug("QInfo:%p has more results waits for client retrieve", pQInfo); } return code; } +int32_t qQueryCompleted(qinfo_t qinfo) { + SQInfo *pQInfo = (SQInfo *)qinfo; + + if (pQInfo == NULL || !isValidQInfo(pQInfo)) { + return TSDB_CODE_QRY_INVALID_QHANDLE; + } + + SQuery* pQuery = pQInfo->runtimeEnv.pQuery; + return IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER); +} + int32_t qKillQuery(qinfo_t qinfo) { SQInfo *pQInfo = (SQInfo *)qinfo; diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 06326e037e0ae8988786bf11f40f1f782609909f..96b4e9cd28a57ae23e61697faf435cf08c768e75 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -353,24 +353,32 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHashEntry *pe = pHashObj->hashList[slot]; + // no data, return directly + if (pe->num == 0) { + assert(pe->next == NULL); + __rd_unlock(&pHashObj->lock, pHashObj->type); + return -1; + } + + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosWLockLatch(&pe->latch); + } + if (pe->num == 0) { assert(pe->next == NULL); } else { assert(pe->next != NULL); } - // no data, return directly + // double check after locked if (pe->num == 0) { assert(pe->next == NULL); + taosWUnLockLatch(&pe->latch); __rd_unlock(&pHashObj->lock, pHashObj->type); return -1; } - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosWLockLatch(&pe->latch); - } - SHashNode *pNode = pe->next; SHashNode *pRes = NULL; diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index b5f8515f789a5ea19134ce626da4940283017378..8a4145f2f85020b6513996318dfc8bd21d578def 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -438,8 +438,8 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { char* key = pNode->key; char* d = pNode->data; - int32_t ref = T_REF_DEC(pNode); - uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, key, d, ref); + int32_t ref = T_REF_VAL_GET(pNode); + uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, key, d, ref - 1); /* * If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all users @@ -449,6 +449,8 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { * that tries to do the same thing. */ if (inTrashCan) { + ref = T_REF_DEC(pNode); + if (ref == 0) { assert(pNode->pTNodeHeader->pData == pNode); @@ -459,7 +461,10 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader); } } else { + // NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread + // when reaches here. int32_t ret = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize); + ref = T_REF_DEC(pNode); // successfully remove from hash table, if failed, this node must have been move to trash already, do nothing. // note that the remove operation can be executed only once. diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 5c7bc869baad69487cacb6f31502292ce818ae5f..0d30be76628e1c05401f59e72283f01b347bd4e6 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -66,7 +66,7 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); } -static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) { +static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) { SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->pCont = qhandle; @@ -75,22 +75,22 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) { atomic_add_fetch_32(&pVnode->refCount, 1); - vDebug("QInfo:%p add to query task queue for exec, msg:%p", qhandle, pRead); + vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead); taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); } -static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void* handle, bool* freeHandle) { +static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void** handle, bool* freeHandle) { bool continueExec = false; int32_t code = TSDB_CODE_SUCCESS; - if ((code = qDumpRetrieveResult(handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { + if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { if (continueExec) { *freeHandle = false; vnodePutItemIntoReadQueue(pVnode, handle); - pRet->qhandle = handle; + pRet->qhandle = *handle; } else { *freeHandle = true; - vDebug("QInfo:%p exec completed, free handle:%d", handle, *freeHandle); + vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle); } } else { pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); @@ -181,50 +181,45 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (handle != NULL) { vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle); - vnodePutItemIntoReadQueue(pVnode, *handle); + vnodePutItemIntoReadQueue(pVnode, handle); } } else { assert(pCont != NULL); + void** qhandle = (void**) pCont; +// *handle = /*(void*) */pCont; - handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont); - if (handle == NULL) { - vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle); - code = TSDB_CODE_QRY_INVALID_QHANDLE; - } else { - vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, (void*) pCont); +// handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont); +// if (handle == NULL) { +// vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle); +// code = TSDB_CODE_QRY_INVALID_QHANDLE; +// } else { + vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle); bool freehandle = false; - bool buildRes = qTableQuery(*handle); // do execute query + bool buildRes = qTableQuery(*qhandle); // do execute query // build query rsp, the retrieve request has reached here already if (buildRes) { // update the connection info according to the retrieve connection - pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*handle); + pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*qhandle); assert(pReadMsg->rpcMsg.handle != NULL); - vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *handle, + vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle, pReadMsg->rpcMsg.handle); - code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, *handle, &freehandle); + code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle); // todo test the error code case if (code == TSDB_CODE_SUCCESS) { code = TSDB_CODE_QRY_HAS_RSP; } + } else { + freehandle = qQueryCompleted(*qhandle); } - // If retrieval request has not arrived, release the qhandle and decrease the reference count to allow - // the queryMgmt to free it when expired - void** dup = handle; - qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); - - // NOTE: - // if the qhandle is put into query vread queue and wait to be executed by worker in read queue, - // the reference count of qhandle can not be decreased. Otherwise, qhandle may be released before or in the - // procedure of query execution - if (freehandle) { - qReleaseQInfo(pVnode->qMgmt, (void **)&dup, freehandle); + // NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle. + if (freehandle || (!buildRes)) { + qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle); } - } } return code; @@ -269,7 +264,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { //TODO handle malloc failure pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); - qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle); + qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); } else { // result is not ready, return immediately if (!buildRes) { qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); @@ -277,12 +272,12 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { } void** dup = handle; - code = vnodeDumpQueryResult(pRet, pVnode, *handle, &freeHandle); + code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); - // not added into task queue, free it immediate + // not added into task queue, the query must be completed already, free qhandle immediate if (freeHandle) { - qReleaseQInfo(pVnode->qMgmt, (void**) &dup, freeHandle); + qReleaseQInfo(pVnode->qMgmt, (void**) &dup, true); } }