提交 8f38c121 编写于 作者: H Haojun Liao

[td-225] support query cancel.

上级 bc1ba128
...@@ -12,8 +12,8 @@ ...@@ -12,8 +12,8 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "qfill.h"
#include "os.h" #include "os.h"
#include "qfill.h"
#include "hash.h" #include "hash.h"
#include "hashfunc.h" #include "hashfunc.h"
...@@ -5822,20 +5822,39 @@ _over: ...@@ -5822,20 +5822,39 @@ _over:
//pQInfo already freed in initQInfo, but *pQInfo may not pointer to null; //pQInfo already freed in initQInfo, but *pQInfo may not pointer to null;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
*pQInfo = NULL; *pQInfo = NULL;
} else {
SQInfo* pq = (SQInfo*) (*pQInfo);
T_REF_INC(pq);
T_REF_INC(pq);
} }
// if failed to add ref for all meters in this query, abort current query // if failed to add ref for all meters in this query, abort current query
return code; return code;
} }
void qDestroyQueryInfo(qinfo_t pQInfo) { static void doDestoryQueryInfo(SQInfo* pQInfo) {
assert(pQInfo != NULL);
qTrace("QInfo:%p query completed", pQInfo); qTrace("QInfo:%p query completed", pQInfo);
queryCostStatis(pQInfo); // print the query cost summary
// print the query cost summary
queryCostStatis(pQInfo);
freeQInfo(pQInfo); freeQInfo(pQInfo);
} }
void qDestroyQueryInfo(qinfo_t qHandle) {
SQInfo* pQInfo = (SQInfo*) qHandle;
if (!isValidQInfo(pQInfo)) {
return;
}
// set the query is cancelled
setQueryKilled(pQInfo);
int16_t ref = T_REF_DEC(pQInfo);
if (ref == 0) {
doDestoryQueryInfo(pQInfo);
}
}
void qTableQuery(qinfo_t qinfo) { void qTableQuery(qinfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
...@@ -5846,6 +5865,11 @@ void qTableQuery(qinfo_t qinfo) { ...@@ -5846,6 +5865,11 @@ void qTableQuery(qinfo_t qinfo) {
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
qTrace("QInfo:%p it is already killed, abort", pQInfo); qTrace("QInfo:%p it is already killed, abort", pQInfo);
int16_t ref = T_REF_DEC(pQInfo);
if (ref == 0) {
doDestoryQueryInfo(pQInfo);
}
return; return;
} }
...@@ -5861,7 +5885,10 @@ void qTableQuery(qinfo_t qinfo) { ...@@ -5861,7 +5885,10 @@ void qTableQuery(qinfo_t qinfo) {
} }
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
// vnodeDecRefCount(pQInfo); int16_t ref = T_REF_DEC(pQInfo);
if (ref == 0) {
doDestoryQueryInfo(pQInfo);
}
} }
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) {
...@@ -5887,20 +5914,27 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { ...@@ -5887,20 +5914,27 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) {
bool qHasMoreResultsToRetrieve(qinfo_t qinfo) { bool qHasMoreResultsToRetrieve(qinfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || pQInfo->signature != pQInfo || pQInfo->code != TSDB_CODE_SUCCESS) { if (isValidQInfo(pQInfo) || pQInfo->code != TSDB_CODE_SUCCESS) {
return false; return false;
} }
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
bool ret = false;
if (Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
return false; ret = false;
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
return true; ret = true;
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
return true; ret = true;
} else { } else {
assert(0); assert(0);
} }
if (ret) {
T_REF_INC(pQInfo);
}
return ret;
} }
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen) { int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen) {
......
...@@ -59,6 +59,18 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { ...@@ -59,6 +59,18 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
} }
// notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately.
static void vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) {
SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
killQueryMsg->qhandle = htobe64((uint64_t) qhandle);
killQueryMsg->free = htons(1);
killQueryMsg->header.vgId = htonl(vgId);
killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg));
}
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void * pCont = pReadMsg->pCont; void * pCont = pReadMsg->pCont;
int32_t contLen = pReadMsg->contLen; int32_t contLen = pReadMsg->contLen;
...@@ -67,9 +79,23 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -67,9 +79,23 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont; SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont;
memset(pRet, 0, sizeof(SRspRet)); memset(pRet, 0, sizeof(SRspRet));
int32_t code = TSDB_CODE_SUCCESS; // qHandle needs to be freed correctly
if (pReadMsg->rpcMsg.code != TSDB_CODE_SUCCESS) {
assert(pReadMsg->rpcMsg.contLen > 0);
SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont;
killQueryMsg->free = htons(killQueryMsg->free);
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);
assert(killQueryMsg->free == 1);
qDestroyQueryInfo((qinfo_t) killQueryMsg->qhandle);
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
qinfo_t pQInfo = NULL; qinfo_t pQInfo = NULL;
if (contLen != 0) { if (contLen != 0) {
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
...@@ -79,7 +105,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -79,7 +105,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRet->len = sizeof(SQueryTableRsp); pRet->len = sizeof(SQueryTableRsp);
pRet->rsp = pRsp; pRet->rsp = pRsp;
vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId);
vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);
} else { } else {
assert(pCont != NULL); assert(pCont != NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册