提交 2c3f94cd 编写于 作者: H Haojun Liao

[td-225] support kill query

上级 8f38c121
...@@ -77,6 +77,12 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* co ...@@ -77,6 +77,12 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* co
*/ */
bool qHasMoreResultsToRetrieve(qinfo_t qinfo); bool qHasMoreResultsToRetrieve(qinfo_t qinfo);
/**
* kill current ongoing query and free query handle automatically
* @param qinfo
*/
int32_t qKillQuery(qinfo_t qinfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -5846,9 +5846,6 @@ void qDestroyQueryInfo(qinfo_t qHandle) { ...@@ -5846,9 +5846,6 @@ void qDestroyQueryInfo(qinfo_t qHandle) {
return; return;
} }
// set the query is cancelled
setQueryKilled(pQInfo);
int16_t ref = T_REF_DEC(pQInfo); int16_t ref = T_REF_DEC(pQInfo);
if (ref == 0) { if (ref == 0) {
doDestoryQueryInfo(pQInfo); doDestoryQueryInfo(pQInfo);
...@@ -5865,11 +5862,7 @@ void qTableQuery(qinfo_t qinfo) { ...@@ -5865,11 +5862,7 @@ void qTableQuery(qinfo_t qinfo) {
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
qTrace("QInfo:%p it is already killed, abort", pQInfo); qTrace("QInfo:%p it is already killed, abort", pQInfo);
qDestroyQueryInfo(pQInfo);
int16_t ref = T_REF_DEC(pQInfo);
if (ref == 0) {
doDestoryQueryInfo(pQInfo);
}
return; return;
} }
...@@ -5885,10 +5878,7 @@ void qTableQuery(qinfo_t qinfo) { ...@@ -5885,10 +5878,7 @@ void qTableQuery(qinfo_t qinfo) {
} }
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
int16_t ref = T_REF_DEC(pQInfo); qDestroyQueryInfo(pQInfo);
if (ref == 0) {
doDestoryQueryInfo(pQInfo);
}
} }
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) {
...@@ -5914,7 +5904,8 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { ...@@ -5914,7 +5904,8 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) {
bool qHasMoreResultsToRetrieve(qinfo_t qinfo) { bool qHasMoreResultsToRetrieve(qinfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
if (isValidQInfo(pQInfo) || pQInfo->code != TSDB_CODE_SUCCESS) { if (!isValidQInfo(pQInfo) || pQInfo->code != TSDB_CODE_SUCCESS) {
qTrace("QInfo:%p invalid qhandle or error occurs, abort query, code:%x", pQInfo, pQInfo->code);
return false; return false;
} }
...@@ -5932,6 +5923,7 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo) { ...@@ -5932,6 +5923,7 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo) {
if (ret) { if (ret) {
T_REF_INC(pQInfo); T_REF_INC(pQInfo);
qTrace("QInfo:%p has more results waits for client retrieve", pQInfo);
} }
return ret; return ret;
...@@ -5979,6 +5971,19 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -5979,6 +5971,19 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
return code; return code;
} }
int32_t qKillQuery(qinfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
setQueryKilled(pQInfo);
qDestroyQueryInfo(pQInfo);
return TSDB_CODE_SUCCESS;
}
static void buildTagQueryResult(SQInfo* pQInfo) { static void buildTagQueryResult(SQInfo* pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
......
...@@ -80,14 +80,12 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -80,14 +80,12 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
memset(pRet, 0, sizeof(SRspRet)); memset(pRet, 0, sizeof(SRspRet));
// qHandle needs to be freed correctly // qHandle needs to be freed correctly
if (pReadMsg->rpcMsg.code != TSDB_CODE_SUCCESS) { if (pReadMsg->rpcMsg.code != TSDB_CODE_RPC_NETWORK_UNAVAIL) {
assert(pReadMsg->rpcMsg.contLen > 0);
SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont; SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont;
killQueryMsg->free = htons(killQueryMsg->free); killQueryMsg->free = htons(killQueryMsg->free);
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle); killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);
assert(killQueryMsg->free == 1); assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
qDestroyQueryInfo((qinfo_t) killQueryMsg->qhandle); qDestroyQueryInfo((qinfo_t) killQueryMsg->qhandle);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -130,10 +128,28 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -130,10 +128,28 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
SRetrieveTableMsg *pRetrieve = pCont; SRetrieveTableMsg *pRetrieve = pCont;
void *pQInfo = (void*) htobe64(pRetrieve->qhandle); void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
pRetrieve->free = htons(pRetrieve->free);
memset(pRet, 0, sizeof(SRspRet)); memset(pRet, 0, sizeof(SRspRet));
if (pRetrieve->free == 1) {
vTrace("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo);
int32_t ret = qKillQuery(pQInfo);
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp);
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
SRetrieveTableRsp* pRsp = pRet->rsp;
pRsp->numOfRows = 0;
pRsp->completed = true;
pRsp->useconds = 0;
return ret;
}
vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo); vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo);
int32_t code = qRetrieveQueryResultInfo(pQInfo); int32_t code = qRetrieveQueryResultInfo(pQInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
//TODO //TODO
...@@ -146,8 +162,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -146,8 +162,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (qHasMoreResultsToRetrieve(pQInfo)) { if (qHasMoreResultsToRetrieve(pQInfo)) {
pRet->qhandle = pQInfo; pRet->qhandle = pQInfo;
code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED; code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED;
} else { } else { // no further execution invoked, release the ref to vnode
// no further execution invoked, release the ref to vnode
qDestroyQueryInfo(pQInfo); qDestroyQueryInfo(pQInfo);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
......
...@@ -115,15 +115,15 @@ int main(int argc, char *argv[]) { ...@@ -115,15 +115,15 @@ int main(int argc, char *argv[]) {
printf("success to connect to server\n"); printf("success to connect to server\n");
// doQuery(taos, "select c1,count(*) from group_db0.group_mt0 where c1<8 group by c1"); // doQuery(taos, "select c1,count(*) from group_db0.group_mt0 where c1<8 group by c1");
doQuery(taos, "select * from test.m1"); // doQuery(taos, "select * from test.m1");
// multiThreadTest(1, taos); // multiThreadTest(1, taos);
// doQuery(taos, "select tbname from test.m1"); // doQuery(taos, "select tbname from test.m1");
// doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 and tbname in ('lm2_tb0') interval(1s) group by t1"); // doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 and tbname in ('lm2_tb0') interval(1s) group by t1");
// doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 and tbname in ('lm2_tb0', 'lm2_tb1', 'lm2_tb2') interval(1s)"); // doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 and tbname in ('lm2_tb0', 'lm2_tb1', 'lm2_tb2') interval(1s)");
// for(int32_t i = 0; i < 100000; ++i) { for(int32_t i = 0; i < 200; ++i) {
// doQuery(taos, "insert into t1 values(now, 2)"); doQuery(taos, "select * from lm2_db0.lm2_stb0");
// } }
// doQuery(taos, "create table t1(ts timestamp, k binary(12), f nchar(2))"); // doQuery(taos, "create table t1(ts timestamp, k binary(12), f nchar(2))");
taos_close(taos); taos_close(taos);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册