提交 aa055abd 编写于 作者: H Haojun Liao

refactor:do some internal refactor.

上级 5c1f55f6
...@@ -136,7 +136,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table ...@@ -136,7 +136,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
* @param handle * @param handle
* @return * @return
*/ */
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch *pLocal); int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch *pLocal);
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds);
/** /**
......
...@@ -690,7 +690,8 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma ...@@ -690,7 +690,8 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
while (1) { while (1) {
uint64_t ts; uint64_t ts;
int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, NULL); bool hasMore = false;
int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL);
if (code < 0) { if (code < 0) {
if (code == TSDB_CODE_QRY_IN_EXEC) { if (code == TSDB_CODE_QRY_IN_EXEC) {
break; break;
......
...@@ -479,7 +479,7 @@ static void freeBlock(void* param) { ...@@ -479,7 +479,7 @@ static void freeBlock(void* param) {
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
} }
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch* pLocal) { int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId(); int64_t threadId = taosGetSelfPthreadId();
...@@ -536,6 +536,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SL ...@@ -536,6 +536,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SL
} }
} }
*hasMore = (pRes != NULL);
uint64_t el = (taosGetTimestampUs() - st); uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el; pTaskInfo->cost.elapsedTime += el;
......
...@@ -107,9 +107,12 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ...@@ -107,9 +107,12 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
QW_TASK_DLOG("start to execTask, loopIdx:%d", i++); QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
// if *taskHandle is NULL, it's killed right now // if *taskHandle is NULL, it's killed right now
bool hasMore = false;
if (taskHandle) { if (taskHandle) {
qwDbgSimulateSleep(); qwDbgSimulateSleep();
code = qExecTaskOpt(taskHandle, pResList, &useconds, &localFetch);
code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch);
if (code) { if (code) {
if (code != TSDB_CODE_OPS_NOT_SUPPORT) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
...@@ -122,20 +125,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ...@@ -122,20 +125,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
++execNum; ++execNum;
if (taosArrayGetSize(pResList) == 0) { size_t numOfResBlock = taosArrayGetSize(pResList);
QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds); for (int32_t j = 0; j < numOfResBlock; ++j) {
dsEndPut(sinkHandle, useconds);
QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
if (queryStop) {
*queryStop = true;
}
break;
}
for (int32_t j = 0; j < taosArrayGetSize(pResList); ++j) {
SSDataBlock *pRes = taosArrayGetP(pResList, j); SSDataBlock *pRes = taosArrayGetP(pResList, j);
ASSERT(pRes->info.rows > 0); ASSERT(pRes->info.rows > 0);
...@@ -149,6 +140,23 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ...@@ -149,6 +140,23 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue); QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue);
} }
if (numOfResBlock == 0 || (hasMore == false)) {
if (numOfResBlock == 0) {
QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
} else {
QW_TASK_DLOG("qExecTask done", "");
}
dsEndPut(sinkHandle, useconds);
QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
if (queryStop) {
*queryStop = true;
}
break;
}
if (!qcontinue) { if (!qcontinue) {
if (queryStop) { if (queryStop) {
*queryStop = true; *queryStop = true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册