diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 9938b3474197ad3dc9a940463c33c518569f03d7..57dbf4dede3edc65a5fe828bbb855f38b5f23eab 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -94,10 +94,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { } int32_t code = TSDB_CODE_SUCCESS; - qinfo_t pQInfo = NULL; void** handle = NULL; if (contLen != 0) { + qinfo_t pQInfo = NULL; code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, NULL, &pQInfo); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); @@ -116,19 +116,19 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { qKillQuery(pQInfo); qKillQuery(pQInfo); - pQInfo = NULL; } else { assert(*handle == pQInfo); pRsp->qhandle = htobe64((uint64_t) (handle)); } + pQInfo = NULL; if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { - vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, pQInfo, pReadMsg->rpcMsg.handle); + vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; // NOTE: there two refcount, needs to kill twice // query has not been put into qhandle pool, kill it directly. - qKillQuery(pQInfo); + qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); return pRsp->code; } @@ -139,16 +139,18 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", vgId, pQInfo); } else { assert(pCont != NULL); - pQInfo = *(void**)(pCont); - handle = pCont; - code = TSDB_CODE_VND_ACTION_IN_PROGRESS; - - vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, pQInfo); + handle = qAcquireQInfo(pVnode->qMgmt, (void**) pCont); + if (handle == NULL) { + vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", *(void**) pCont, pReadMsg->rpcMsg.handle); + code = TSDB_CODE_QRY_INVALID_QHANDLE; + } else { + vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, *(void**) pCont); + code = TSDB_CODE_VND_ACTION_IN_PROGRESS; + } } - if (pQInfo != NULL) { - qTableQuery(pQInfo); // do execute query - assert(handle != NULL); + if (handle != NULL) { + qTableQuery(*handle); // do execute query qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); } @@ -160,57 +162,64 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { SRspRet *pRet = &pReadMsg->rspRet; SRetrieveTableMsg *pRetrieve = pCont; - void **pQInfo = (void*) htobe64(pRetrieve->qhandle); + pRetrieve->qhandle = htobe64(pRetrieve->qhandle); pRetrieve->free = htons(pRetrieve->free); - vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, *pQInfo); + vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, *(void**) pRetrieve->qhandle); memset(pRet, 0, sizeof(SRspRet)); - int32_t ret = 0; - void** handle = qAcquireQInfo(pVnode->qMgmt, pQInfo); - if (handle == NULL || handle != pQInfo) { - ret = TSDB_CODE_QRY_INVALID_QHANDLE; + int32_t code = TSDB_CODE_SUCCESS; + void** handle = qAcquireQInfo(pVnode->qMgmt, (void**) pRetrieve->qhandle); + if (handle == NULL || handle != (void**) pRetrieve->qhandle) { + code = TSDB_CODE_QRY_INVALID_QHANDLE; + vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, *(void**) pRetrieve->qhandle); + + pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + pRet->len = sizeof(SRetrieveTableRsp); + + memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); + SRetrieveTableRsp* pRsp = pRet->rsp; + pRsp->numOfRows = 0; + pRsp->completed = true; + pRsp->useconds = 0; + + return code; } if (pRetrieve->free == 1) { - if (ret == TSDB_CODE_SUCCESS) { - vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo); - qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); - - pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); - pRet->len = sizeof(SRetrieveTableRsp); - - memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); - SRetrieveTableRsp* pRsp = pRet->rsp; - pRsp->numOfRows = 0; - pRsp->completed = true; - pRsp->useconds = 0; - } else { // todo handle error - qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); - } - return ret; - } + vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); + qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); - int32_t code = qRetrieveQueryResultInfo(*pQInfo); - if (code != TSDB_CODE_SUCCESS || ret != TSDB_CODE_SUCCESS) { - //TODO pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + pRet->len = sizeof(SRetrieveTableRsp); + memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); + SRetrieveTableRsp* pRsp = pRet->rsp; + pRsp->numOfRows = 0; + pRsp->completed = true; + pRsp->useconds = 0; - } else { - // todo check code and handle error in build result set - code = qDumpRetrieveResult(*pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len); - - if (qHasMoreResultsToRetrieve(*handle)) { - dnodePutItemIntoReadQueue(pVnode, handle); - pRet->qhandle = handle; - code = TSDB_CODE_SUCCESS; - } else { // no further execution invoked, release the ref to vnode - qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); + return code; + } + + bool freeHandle = true; + code = qRetrieveQueryResultInfo(*handle); + if (code != TSDB_CODE_SUCCESS) { + //TODO handle malloc failure + pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); + } else { // if failed to dump result, free qhandle immediately + if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) { + if (qHasMoreResultsToRetrieve(*handle)) { + dnodePutItemIntoReadQueue(pVnode, handle); + pRet->qhandle = handle; + freeHandle = false; + } } } + qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle); return code; }