diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bfb80ec8f845ccf11b0600da93305c3c65b616c0..9d001c95347dc9e367ab79affd6764d3a79e1f8b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1364,6 +1364,7 @@ typedef struct { int8_t compressed; int8_t streamBlockType; int32_t compLen; + int32_t numOfBlocks; int32_t numOfRows; int32_t numOfCols; int64_t skey; diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 47177dc11b5669511d92202605c002e34c64d589..71105d88eb52cb7da920ebb967a892f59cde3894 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -67,6 +67,7 @@ typedef struct SInputData { } SInputData; typedef struct SOutputData { + int32_t numOfBlocks; int32_t numOfRows; int32_t numOfCols; int8_t compressed; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 8f036714c9cc761ea997d84e456b6946805e5d96..729ac474e4720a13a2da8d463820d05db72970f0 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -35,6 +35,7 @@ extern "C" { #define QW_DEFAULT_SHORT_RUN_TIMES 2 #define QW_DEFAULT_HEARTBEAT_MSEC 5000 #define QW_SCH_TIMEOUT_MSEC 180000 +#define QW_MIN_RES_ROWS 4096 enum { QW_PHASE_PRE_QUERY = 1, @@ -135,6 +136,7 @@ typedef struct SQWTaskCtx { int32_t msgType; int32_t fetchType; int32_t execId; + int32_t level; bool queryRsped; bool queryEnd; diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index e8ffd98153a2e7ac32e4e7500cb0e1a5d14da0e4..63a3c1bfeaf86a3cba3de1f2cb5ea5a72f50a666 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -12,14 +12,16 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { int32_t msgSize = sizeof(SRetrieveTableRsp) + length; - SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize); + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*rsp, msgSize); if (NULL == pRsp) { qError("rpcMallocCont %d failed", msgSize); QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - memset(pRsp, 0, sizeof(SRetrieveTableRsp)); - + if (NULL == *rsp) { + memset(pRsp, 0, sizeof(SRetrieveTableRsp)); + } + *rsp = pRsp; return TSDB_CODE_SUCCESS; @@ -35,6 +37,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) rsp->compLen = htonl(len); rsp->numOfRows = htonl(input->numOfRows); rsp->numOfCols = htonl(input->numOfCols); + rsp->numOfBlocks = htonl(input->numOfBlocks); } void qwFreeFetchRsp(void *msg) { diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 36d85f1f123cc8864d8678d07f174308a38ba729..d93c07ce1ec26c7dd25ad737e35044eca7053c36 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -203,57 +203,88 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, SRetrieveTableRsp *rsp = NULL; bool queryEnd = false; int32_t code = 0; + SOutputData output = {0}; - dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); + *dataLen = 0; - if (len < 0) { - QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len); - QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } + while (true) { + dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); - if (len == 0) { - if (queryEnd) { - code = dsGetDataBlock(ctx->sinkHandle, pOutput); - if (code) { - QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); - QW_ERR_RET(code); - } + if (len < 0) { + QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len); + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } - QW_TASK_DLOG_E("no data in sink and query end"); + if (len == 0) { + if (queryEnd) { + code = dsGetDataBlock(ctx->sinkHandle, &output); + if (code) { + QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); + QW_ERR_RET(code); + } - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); - QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); + QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %d", pOutput->numOfBlocks, pOutput->numOfRows); - *rspMsg = rsp; - *dataLen = 0; - return TSDB_CODE_SUCCESS; + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); + if (NULL == rsp) { + QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); + *pOutput = output; + } else { + pOutput->queryEnd = output.queryEnd; + pOutput->bufStatus = output.bufStatus; + pOutput->useconds = output.useconds; + } + + break; + } + + pOutput->bufStatus = DS_BUF_EMPTY; + + break; } - pOutput->bufStatus = DS_BUF_EMPTY; + // Got data from sink + QW_TASK_DLOG("there are data in sink, dataLength:%d", len); - return TSDB_CODE_SUCCESS; - } + *dataLen += len; - // Got data from sink - QW_TASK_DLOG("there are data in sink, dataLength:%d", len); + QW_ERR_RET(qwMallocFetchRsp(*dataLen, &rsp)); - *dataLen = len; + output.pData = rsp->data + *dataLen - len; + code = dsGetDataBlock(ctx->sinkHandle, &output); + if (code) { + QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); + QW_ERR_RET(code); + } - QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); - *rspMsg = rsp; + pOutput->queryEnd = output.queryEnd; + pOutput->precision = output.precision; + pOutput->bufStatus = output.bufStatus; + pOutput->useconds = output.useconds; + pOutput->compressed = output.compressed; + pOutput->numOfCols = output.numOfCols; + pOutput->numOfRows += output.numOfRows; + pOutput->numOfBlocks++; + + if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { + QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %d", pOutput->numOfBlocks, pOutput->numOfRows); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); + break; + } - pOutput->pData = rsp->data; - code = dsGetDataBlock(ctx->sinkHandle, pOutput); - if (code) { - QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); - QW_ERR_RET(code); - } + if (0 == ctx->level) { + QW_TASK_DLOG("task fetched blocks %d rows %d, level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level); + break; + } - if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { - QW_TASK_DLOG_E("task all data fetched, done"); - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); + if (pOutput->numOfRows >= QW_MIN_RES_ROWS) { + QW_TASK_DLOG("task fetched blocks %d rows %d reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows); + break; + } } + *rspMsg = rsp; + return TSDB_CODE_SUCCESS; } @@ -551,6 +582,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) { // queryRsped = true; + ctx->level = plan->level; atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->sinkHandle, sinkHandle);