提交 1e18ba19 编写于 作者: H Haojun Liao

fix(query): set results for local query in async APIs.

上级 5f93b8db
......@@ -13,6 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_COMMAND_H
#define TDENGINE_COMMAND_H
#include "cmdnodes.h"
#include "tmsg.h"
#include "plannodes.h"
......@@ -27,4 +30,4 @@ int32_t qExecExplainEnd(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp);
int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp);
void qExplainFreeCtx(SExplainCtx *pCtx);
#endif
......@@ -843,19 +843,25 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
pRequest->body.param = param;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
if (taos_num_fields(pRequest) == 0) {
// this query has no results or error exists, return directly
if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
}
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
// All data has returned to App already, no need to try again
if (pResultInfo->completed) {
pResultInfo->numOfRows = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
}
// all data has returned to App already, no need to try again
if ((pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) && pResultInfo->completed) {
pResultInfo->numOfRows = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
}
// it is a local executed query, no need to do async fetch
if (pResultInfo->current < pResultInfo->numOfRows && pRequest->body.queryJob == 0) {
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
}
schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest);
......@@ -864,14 +870,14 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
ASSERT(res != NULL && fp != NULL);
ASSERT(TD_RES_QUERY(res));
SRequestObj *pRequest = res;
pRequest->body.resInfo.convertUcs4 = false;
SRequestObj *pRequest = res;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
// set the current block is all consumed
pResultInfo->current = pResultInfo->numOfRows;
pResultInfo->convertUcs4 = false;
taos_fetch_rows_a(res, fp, param);
}
......
......@@ -565,7 +565,6 @@ static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** p
}
int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) {
int32_t numOfCols = LIST_LENGTH(pProjects);
blockDataEnsureCapacity(pBlock, 1);
int32_t index = 0;
......@@ -579,7 +578,6 @@ int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) {
}
pBlock->info.rows = 1;
return TSDB_CODE_SUCCESS;
}
......
......@@ -13,13 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "schedulerInt.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
SSchedulerMgmt schMgmt = {
.jobRef = -1,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册