diff --git a/src/inc/query.h b/src/inc/query.h index cdadd4759fcfd2b8ae5b1ba8a431dbb56bfac08c..10ee0249b6730fc58881f2e47ef3425c74cfe36d 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -77,6 +77,12 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* co */ bool qHasMoreResultsToRetrieve(qinfo_t qinfo); +/** + * kill current ongoing query and free query handle automatically + * @param qinfo + */ +int32_t qKillQuery(qinfo_t qinfo); + #ifdef __cplusplus } #endif diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 97d9efa6188cd6da634adeeb3bf87ff7f0746938..85d4acdfed399839b0a2efcb915e67a5ad19746d 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5846,9 +5846,6 @@ void qDestroyQueryInfo(qinfo_t qHandle) { return; } - // set the query is cancelled - setQueryKilled(pQInfo); - int16_t ref = T_REF_DEC(pQInfo); if (ref == 0) { doDestoryQueryInfo(pQInfo); @@ -5865,11 +5862,7 @@ void qTableQuery(qinfo_t qinfo) { if (isQueryKilled(pQInfo)) { qTrace("QInfo:%p it is already killed, abort", pQInfo); - - int16_t ref = T_REF_DEC(pQInfo); - if (ref == 0) { - doDestoryQueryInfo(pQInfo); - } + qDestroyQueryInfo(pQInfo); return; } @@ -5885,10 +5878,7 @@ void qTableQuery(qinfo_t qinfo) { } sem_post(&pQInfo->dataReady); - int16_t ref = T_REF_DEC(pQInfo); - if (ref == 0) { - doDestoryQueryInfo(pQInfo); - } + qDestroyQueryInfo(pQInfo); } int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { @@ -5914,7 +5904,8 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { bool qHasMoreResultsToRetrieve(qinfo_t qinfo) { SQInfo *pQInfo = (SQInfo *)qinfo; - if (isValidQInfo(pQInfo) || pQInfo->code != TSDB_CODE_SUCCESS) { + if (!isValidQInfo(pQInfo) || pQInfo->code != TSDB_CODE_SUCCESS) { + qTrace("QInfo:%p invalid qhandle or error occurs, abort query, code:%x", pQInfo, pQInfo->code); return false; } @@ -5932,6 +5923,7 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo) { if (ret) { T_REF_INC(pQInfo); + qTrace("QInfo:%p has more results waits for client retrieve", pQInfo); } return ret; @@ -5979,6 +5971,19 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co return code; } +int32_t qKillQuery(qinfo_t qinfo) { + SQInfo *pQInfo = (SQInfo *)qinfo; + + if (pQInfo == NULL || !isValidQInfo(pQInfo)) { + return TSDB_CODE_QRY_INVALID_QHANDLE; + } + + setQueryKilled(pQInfo); + qDestroyQueryInfo(pQInfo); + + return TSDB_CODE_SUCCESS; +} + static void buildTagQueryResult(SQInfo* pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 4c36703e6b7f2dec5dc6866bdd1a5cb8a4f95bf0..9dce61704f9c6dcefa17cc5fb4fa7091418aa21e 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -80,14 +80,12 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { memset(pRet, 0, sizeof(SRspRet)); // qHandle needs to be freed correctly - if (pReadMsg->rpcMsg.code != TSDB_CODE_SUCCESS) { - assert(pReadMsg->rpcMsg.contLen > 0); - + if (pReadMsg->rpcMsg.code != TSDB_CODE_RPC_NETWORK_UNAVAIL) { SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont; killQueryMsg->free = htons(killQueryMsg->free); killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle); - assert(killQueryMsg->free == 1); + assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1); qDestroyQueryInfo((qinfo_t) killQueryMsg->qhandle); return TSDB_CODE_SUCCESS; @@ -130,10 +128,28 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { SRetrieveTableMsg *pRetrieve = pCont; void *pQInfo = (void*) htobe64(pRetrieve->qhandle); + pRetrieve->free = htons(pRetrieve->free); + memset(pRet, 0, sizeof(SRspRet)); + if (pRetrieve->free == 1) { + vTrace("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo); + int32_t ret = qKillQuery(pQInfo); + + pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + pRet->len = sizeof(SRetrieveTableRsp); + + memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); + SRetrieveTableRsp* pRsp = pRet->rsp; + pRsp->numOfRows = 0; + pRsp->completed = true; + pRsp->useconds = 0; + + return ret; + } + vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo); - + int32_t code = qRetrieveQueryResultInfo(pQInfo); if (code != TSDB_CODE_SUCCESS) { //TODO @@ -146,8 +162,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (qHasMoreResultsToRetrieve(pQInfo)) { pRet->qhandle = pQInfo; code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED; - } else { - // no further execution invoked, release the ref to vnode + } else { // no further execution invoked, release the ref to vnode qDestroyQueryInfo(pQInfo); vnodeRelease(pVnode); } diff --git a/tests/examples/c/demo.c b/tests/examples/c/demo.c index 55a19eb5f90e27d9040e1d15c81909562b6c608b..5b54efe22d5560984febe981126963ac8b3c205f 100644 --- a/tests/examples/c/demo.c +++ b/tests/examples/c/demo.c @@ -115,15 +115,15 @@ int main(int argc, char *argv[]) { printf("success to connect to server\n"); // doQuery(taos, "select c1,count(*) from group_db0.group_mt0 where c1<8 group by c1"); - doQuery(taos, "select * from test.m1"); +// doQuery(taos, "select * from test.m1"); // multiThreadTest(1, taos); // doQuery(taos, "select tbname from test.m1"); // doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 and tbname in ('lm2_tb0') interval(1s) group by t1"); // doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 and tbname in ('lm2_tb0', 'lm2_tb1', 'lm2_tb2') interval(1s)"); -// for(int32_t i = 0; i < 100000; ++i) { -// doQuery(taos, "insert into t1 values(now, 2)"); -// } + for(int32_t i = 0; i < 200; ++i) { + doQuery(taos, "select * from lm2_db0.lm2_stb0"); + } // doQuery(taos, "create table t1(ts timestamp, k binary(12), f nchar(2))"); taos_close(taos);