diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index bca07e71505dd151c1532c9d92cd90b1d39d036d..5aa453f30e50f74a5fd596d6feab41608d945d4d 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6356,7 +6356,6 @@ bool qTableQuery(qinfo_t qinfo) { pthread_mutex_unlock(&pQInfo->lock); return buildRes; -// sem_post(&pQInfo->dataReady); } int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContext) { diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 2a4ca0e663d1327ed15e22eb8d3398a724e5a942..066770e1bb9de5d463a1abd2b4078c3d41f9c574 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -77,6 +77,39 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) { taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); } +static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void* handle, bool* freeHandle) { + bool continueExec = false; + + int32_t code = TSDB_CODE_SUCCESS; + if ((code = qDumpRetrieveResult(handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { + if (continueExec) { + vDebug("QInfo:%p add to query task queue for exec", handle); + vnodePutItemIntoReadQueue(pVnode, handle); + pRet->qhandle = handle; + *freeHandle = false; + } else { + vDebug("QInfo:%p exec completed", handle); + *freeHandle = true; + } + } else { + pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); + *freeHandle = true; + } + + return code; +} + +static void vnodeBuildNoResultQueryRsp(SRspRet* pRet) { + pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + pRet->len = sizeof(SRetrieveTableRsp); + + memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); + SRetrieveTableRsp* pRsp = pRet->rsp; + + pRsp->completed = true; +} + static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { void *pCont = pReadMsg->pCont; int32_t contLen = pReadMsg->contLen; @@ -161,37 +194,27 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { code = TSDB_CODE_QRY_INVALID_QHANDLE; } else { vDebug("vgId:%d, QInfo:%p, dnode continue exec query", pVnode->vgId, (void*) pCont); + + bool freehandle = false; bool buildRes = qTableQuery(*handle); // do execute query - if (buildRes) { // build result rsp + // build query rsp + if (buildRes) { // update the connection info according to the retrieve connection pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*handle); assert(pReadMsg->rpcMsg.handle != NULL); vDebug("vgId:%d, QInfo:%p, start to build result rsp after query paused, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); + code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, *handle, &freehandle); - pRet = &pReadMsg->rspRet; -// code = TSDB_CODE_QRY_HAS_RSP; - - bool continueExec = false; - if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { - - if (continueExec) { - vTrace("QInfo:%p add to queue for further exec", *handle); - vnodePutItemIntoReadQueue(pVnode, *handle); - pRet->qhandle = *handle; -// code = TSDB_CODE_SUCCESS; - } else { - vDebug("QInfo:%p query completed", *handle); - } - } else { // todo handle error + // todo test the error code case + if (code == TSDB_CODE_SUCCESS) { + code = TSDB_CODE_QRY_HAS_RSP; } - - code = TSDB_CODE_QRY_HAS_RSP; } - } - qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); + qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freehandle); + } } return code; @@ -214,16 +237,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) { code = TSDB_CODE_QRY_INVALID_QHANDLE; vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle); - - pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); - pRet->len = sizeof(SRetrieveTableRsp); - - memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); - SRetrieveTableRsp* pRsp = pRet->rsp; - pRsp->numOfRows = 0; - pRsp->useconds = 0; - pRsp->completed = true; - + + vnodeBuildNoResultQueryRsp(pRet); return code; } @@ -232,15 +247,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); - pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); - pRet->len = sizeof(SRetrieveTableRsp); - - memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); - SRetrieveTableRsp* pRsp = pRet->rsp; - pRsp->numOfRows = 0; - pRsp->completed = true; - pRsp->useconds = 0; - + vnodeBuildNoResultQueryRsp(pRet); return code; } @@ -258,17 +265,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { return TSDB_CODE_QRY_NOT_READY; } - bool continueExec = false; - if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { - if (continueExec) { - vnodePutItemIntoReadQueue(pVnode, *handle); - pRet->qhandle = *handle; - freeHandle = false; - } - } else { - pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); - memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); - } + code = vnodeDumpQueryResult(pRet, pVnode, *handle, &freeHandle); } qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle);