From 315fa7404247750306afc9450699cac36f3cf761 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 13 Jun 2020 11:58:58 +0800 Subject: [PATCH] [td-225] do not execute query if the link is already broken. --- src/inc/query.h | 3 ++- src/vnode/src/vnodeRead.c | 16 +++++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/inc/query.h b/src/inc/query.h index 10ee0249b6..49ee5248f5 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -70,7 +70,8 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo); int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen); /** - * Decide if more results will be produced or not + * Decide if more results will be produced or not, NOTE: this function will increase the ref count of QInfo, + * so it can be only called once for each retrieve * * @param qinfo * @return diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index d6227f4270..1e770d8d27 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -61,7 +61,7 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { // notify connection(handle) that current qhandle is created, if current connection from // client is broken, the query needs to be killed immediately. -static void vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) { +static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) { SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); killQueryMsg->qhandle = htobe64((uint64_t) qhandle); killQueryMsg->free = htons(1); @@ -69,7 +69,7 @@ static void vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); vTrace("QInfo:%p register qhandle to connect:%p", qhandle, handle); - rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg)); + return rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg)); } static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { @@ -106,7 +106,17 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { pRet->len = sizeof(SQueryTableRsp); pRet->rsp = pRsp; - vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId); + // current connect is broken + if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId) != TSDB_CODE_SUCCESS) { + vError("vgId:%d, QInfo:%p, dnode query discarded since link is broken, %p", pVnode->vgId, pQInfo, pReadMsg->rpcMsg.handle); + pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + + //NOTE: there two refcount, needs to kill twice, todo refactor + qKillQuery(pQInfo); + qKillQuery(pQInfo); + + return pRsp->code; + } vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); } else { -- GitLab