提交 ead033e9 编写于 作者: H Haojun Liao

Merge branch 'feature/3.0_query_optimize.0' into feature/3.0_query_optimize

...@@ -1364,6 +1364,7 @@ typedef struct { ...@@ -1364,6 +1364,7 @@ typedef struct {
int8_t compressed; int8_t compressed;
int8_t streamBlockType; int8_t streamBlockType;
int32_t compLen; int32_t compLen;
int32_t numOfBlocks;
int32_t numOfRows; int32_t numOfRows;
int32_t numOfCols; int32_t numOfCols;
int64_t skey; int64_t skey;
......
...@@ -67,6 +67,7 @@ typedef struct SInputData { ...@@ -67,6 +67,7 @@ typedef struct SInputData {
} SInputData; } SInputData;
typedef struct SOutputData { typedef struct SOutputData {
int32_t numOfBlocks;
int32_t numOfRows; int32_t numOfRows;
int32_t numOfCols; int32_t numOfCols;
int8_t compressed; int8_t compressed;
......
...@@ -35,6 +35,7 @@ extern "C" { ...@@ -35,6 +35,7 @@ extern "C" {
#define QW_DEFAULT_SHORT_RUN_TIMES 2 #define QW_DEFAULT_SHORT_RUN_TIMES 2
#define QW_DEFAULT_HEARTBEAT_MSEC 5000 #define QW_DEFAULT_HEARTBEAT_MSEC 5000
#define QW_SCH_TIMEOUT_MSEC 180000 #define QW_SCH_TIMEOUT_MSEC 180000
#define QW_MIN_RES_ROWS 4096
enum { enum {
QW_PHASE_PRE_QUERY = 1, QW_PHASE_PRE_QUERY = 1,
...@@ -135,6 +136,7 @@ typedef struct SQWTaskCtx { ...@@ -135,6 +136,7 @@ typedef struct SQWTaskCtx {
int32_t msgType; int32_t msgType;
int32_t fetchType; int32_t fetchType;
int32_t execId; int32_t execId;
int32_t level;
bool queryRsped; bool queryRsped;
bool queryEnd; bool queryEnd;
......
...@@ -12,14 +12,16 @@ ...@@ -12,14 +12,16 @@
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
int32_t msgSize = sizeof(SRetrieveTableRsp) + length; int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(msgSize); SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*rsp, msgSize);
if (NULL == pRsp) { if (NULL == pRsp) {
qError("rpcMallocCont %d failed", msgSize); qError("rpcMallocCont %d failed", msgSize);
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
memset(pRsp, 0, sizeof(SRetrieveTableRsp)); if (NULL == *rsp) {
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
}
*rsp = pRsp; *rsp = pRsp;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -35,6 +37,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) ...@@ -35,6 +37,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete)
rsp->compLen = htonl(len); rsp->compLen = htonl(len);
rsp->numOfRows = htonl(input->numOfRows); rsp->numOfRows = htonl(input->numOfRows);
rsp->numOfCols = htonl(input->numOfCols); rsp->numOfCols = htonl(input->numOfCols);
rsp->numOfBlocks = htonl(input->numOfBlocks);
} }
void qwFreeFetchRsp(void *msg) { void qwFreeFetchRsp(void *msg) {
......
...@@ -203,57 +203,88 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, ...@@ -203,57 +203,88 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
SRetrieveTableRsp *rsp = NULL; SRetrieveTableRsp *rsp = NULL;
bool queryEnd = false; bool queryEnd = false;
int32_t code = 0; int32_t code = 0;
SOutputData output = {0};
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); *dataLen = 0;
if (len < 0) { while (true) {
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len); dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (len == 0) { if (len < 0) {
if (queryEnd) { QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
code = dsGetDataBlock(ctx->sinkHandle, pOutput); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
if (code) { }
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
QW_ERR_RET(code);
}
QW_TASK_DLOG_E("no data in sink and query end"); if (len == 0) {
if (queryEnd) {
code = dsGetDataBlock(ctx->sinkHandle, &output);
if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
QW_ERR_RET(code);
}
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %d", pOutput->numOfBlocks, pOutput->numOfRows);
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
*rspMsg = rsp; qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
*dataLen = 0; if (NULL == rsp) {
return TSDB_CODE_SUCCESS; QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
*pOutput = output;
} else {
pOutput->queryEnd = output.queryEnd;
pOutput->bufStatus = output.bufStatus;
pOutput->useconds = output.useconds;
}
break;
}
pOutput->bufStatus = DS_BUF_EMPTY;
break;
} }
pOutput->bufStatus = DS_BUF_EMPTY; // Got data from sink
QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
return TSDB_CODE_SUCCESS; *dataLen += len;
}
// Got data from sink QW_ERR_RET(qwMallocFetchRsp(*dataLen, &rsp));
QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
*dataLen = len; output.pData = rsp->data + *dataLen - len;
code = dsGetDataBlock(ctx->sinkHandle, &output);
if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
QW_ERR_RET(code);
}
QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); pOutput->queryEnd = output.queryEnd;
*rspMsg = rsp; pOutput->precision = output.precision;
pOutput->bufStatus = output.bufStatus;
pOutput->useconds = output.useconds;
pOutput->compressed = output.compressed;
pOutput->numOfCols = output.numOfCols;
pOutput->numOfRows += output.numOfRows;
pOutput->numOfBlocks++;
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %d", pOutput->numOfBlocks, pOutput->numOfRows);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
break;
}
pOutput->pData = rsp->data; if (0 == ctx->level) {
code = dsGetDataBlock(ctx->sinkHandle, pOutput); QW_TASK_DLOG("task fetched blocks %d rows %d, level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level);
if (code) { break;
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); }
QW_ERR_RET(code);
}
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { if (pOutput->numOfRows >= QW_MIN_RES_ROWS) {
QW_TASK_DLOG_E("task all data fetched, done"); QW_TASK_DLOG("task fetched blocks %d rows %d reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); break;
}
} }
*rspMsg = rsp;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -551,6 +582,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) { ...@@ -551,6 +582,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) {
// queryRsped = true; // queryRsped = true;
ctx->level = plan->level;
atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
atomic_store_ptr(&ctx->sinkHandle, sinkHandle); atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册