diff --git a/src/inc/query.h b/src/inc/query.h index 49ee5248f5e267db5e67a04e47c18d16dd692613..5fd2ede034ebfaaf86eecce7a429c33996606027 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -32,11 +32,11 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMs /** * Destroy QInfo object - * - * @param qinfo - * @return + * @param qinfo qhandle + * @param fp destroy callback function, while the qhandle is destoried, invoke the fp + * @param param free callback params */ -void qDestroyQueryInfo(qinfo_t qinfo); +void qDestroyQueryInfo(qinfo_t qinfo, void (*fp)(void*), void* param); /** * the main query execution function, including query on both table and multitables, @@ -45,7 +45,7 @@ void qDestroyQueryInfo(qinfo_t qinfo); * @param qinfo * @return */ -void qTableQuery(qinfo_t qinfo); +void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param); /** * Retrieve the produced results information, if current query is not paused or completed, @@ -80,9 +80,12 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo); /** * kill current ongoing query and free query handle automatically - * @param qinfo + * @param qinfo qhandle + * @param fp destroy callback function, while the qhandle is destoried, invoke the fp + * @param param free callback params + * @return */ -int32_t qKillQuery(qinfo_t qinfo); +int32_t qKillQuery(qinfo_t qinfo, void (*fp)(void*), void* param); #ifdef __cplusplus } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 5e0dd2a8be8918a8beb0806079a987030865df27..bb6eaa68e5a0159c6e1cdeeec848af839e7d1bc2 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5981,7 +5981,7 @@ static void doDestoryQueryInfo(SQInfo* pQInfo) { freeQInfo(pQInfo); } -void qDestroyQueryInfo(qinfo_t qHandle) { +void qDestroyQueryInfo(qinfo_t qHandle, void (*fp)(void*), void* param) { SQInfo* pQInfo = (SQInfo*) qHandle; if (!isValidQInfo(pQInfo)) { return; @@ -5992,10 +5992,14 @@ void qDestroyQueryInfo(qinfo_t qHandle) { if (ref == 0) { doDestoryQueryInfo(pQInfo); + + if (fp != NULL) { + fp(param); + } } } -void qTableQuery(qinfo_t qinfo) { +void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { SQInfo *pQInfo = (SQInfo *)qinfo; if (pQInfo == NULL || pQInfo->signature != pQInfo) { @@ -6005,7 +6009,7 @@ void qTableQuery(qinfo_t qinfo) { if (isQueryKilled(pQInfo)) { qTrace("QInfo:%p it is already killed, abort", pQInfo); - qDestroyQueryInfo(pQInfo); + qDestroyQueryInfo(pQInfo, fp, param); return; } @@ -6021,7 +6025,7 @@ void qTableQuery(qinfo_t qinfo) { } sem_post(&pQInfo->dataReady); - qDestroyQueryInfo(pQInfo); + qDestroyQueryInfo(pQInfo, fp, param); } int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { @@ -6114,7 +6118,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co return code; } -int32_t qKillQuery(qinfo_t qinfo) { +int32_t qKillQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { SQInfo *pQInfo = (SQInfo *)qinfo; if (pQInfo == NULL || !isValidQInfo(pQInfo)) { @@ -6122,7 +6126,7 @@ int32_t qKillQuery(qinfo_t qinfo) { } setQueryKilled(pQInfo); - qDestroyQueryInfo(pQInfo); + qDestroyQueryInfo(pQInfo, fp, param); return TSDB_CODE_SUCCESS; } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 1e770d8d277db74194b134d2cab35a59ef7eff22..f9dcd5e6e0bc0fe302877b6aa0d7cda1a9b2522c 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -89,7 +89,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { vWarn("QInfo:%p connection %p broken, kill query", killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1); - qKillQuery((qinfo_t) killQueryMsg->qhandle); + // this message arrived here by means of the query message, so release the vnode is necessary + qKillQuery((qinfo_t) killQueryMsg->qhandle, vnodeRelease, pVnode); + vnodeRelease(pVnode); + return TSDB_CODE_TSC_QUERY_CANCELLED; // todo change the error code } @@ -112,8 +115,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; //NOTE: there two refcount, needs to kill twice, todo refactor - qKillQuery(pQInfo); - qKillQuery(pQInfo); + qKillQuery(pQInfo, vnodeRelease, pVnode); + qKillQuery(pQInfo, vnodeRelease, pVnode); return pRsp->code; } @@ -128,7 +131,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (pQInfo != NULL) { vTrace("vgId:%d, QInfo:%p, do qTableQuery", pVnode->vgId, pQInfo); - qTableQuery(pQInfo); // do execute query + qTableQuery(pQInfo, vnodeRelease, pVnode); // do execute query } return code; @@ -146,7 +149,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (pRetrieve->free == 1) { vTrace("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo); - int32_t ret = qKillQuery(pQInfo); + int32_t ret = qKillQuery(pQInfo, vnodeRelease, pVnode); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->len = sizeof(SRetrieveTableRsp); @@ -175,8 +178,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { pRet->qhandle = pQInfo; code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED; } else { // no further execution invoked, release the ref to vnode - qDestroyQueryInfo(pQInfo); - vnodeRelease(pVnode); + qDestroyQueryInfo(pQInfo, vnodeRelease, pVnode); } }