From 93be707cdb0858e5db748a7cca57d08922bd244e Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 11 Jan 2022 16:23:23 +0800 Subject: [PATCH] feature/qnode --- source/client/src/clientImpl.c | 2 +- source/libs/executor/src/executorMain.c | 6 +- source/libs/qworker/src/qworker.c | 9 +-- source/libs/scheduler/src/scheduler.c | 84 +++++++++++++------------ 4 files changed, 50 insertions(+), 51 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 14cb35355f..616397f8e8 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -546,7 +546,7 @@ void* doFetchRow(SRequestObj* pRequest) { if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { if (pRequest->type == TDMT_VND_QUERY) { pRequest->type = TDMT_VND_FETCH; - scheduleFetchRows(pRequest->body.pQueryJob, &pRequest->body.resInfo.pData); + scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData); } else if (pRequest->type == TDMT_MND_SHOW) { pRequest->type = TDMT_MND_SHOW_RETRIEVE; } else if (pRequest->type == TDMT_VND_SHOW_TABLES) { diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index e8799542b2..3f77e15382 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -191,9 +191,9 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { return pTaskInfo->code; } - int32_t status = 0; + bool qcontinue = false; SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL}; - pTaskInfo->code = dsPutDataBlock(pTaskInfo->dsHandle, &inputData, &status); + pTaskInfo->code = dsPutDataBlock(pTaskInfo->dsHandle, &inputData, &qcontinue); if (isTaskKilled(pTaskInfo)) { qDebug("QInfo:0x%" PRIx64 " task is killed", GET_TASKID(pTaskInfo)); @@ -202,7 +202,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { // pRuntimeEnv->resultInfo.total); } - if (status == DS_DATA_FULL) { + if (!qcontinue) { qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d", GET_TASKID(pTaskInfo), 0, 0L, 0); return pTaskInfo->code; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 91a27a831f..9401d38fc7 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -6,6 +6,7 @@ #include "qworkerInt.h" #include "tmsg.h" #include "tname.h" +#include "dataSinkMgt.h" int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) { int32_t code = 0; @@ -891,14 +892,10 @@ int32_t qwHandleFetch(SQWorkerTaskHandlesCache *handles, SQWorkerMgmt *mgmt, uin QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - code = dsGetDataLength(handles->sinkHandle, &dataLength, &queryEnd); - if (TSDB_CODE_SUCCESS != code) { - qError("dsGetDataLength failed, code:%x", code); - QW_ERR_JRET(code); - } + dsGetDataLength(handles->sinkHandle, &dataLength, &queryEnd); if (dataLength > 0) { - SOutPutData output = {0}; + SOutputData output = {0}; QW_ERR_JRET(qwInitFetchRsp(dataLength, &rsp)); output.pData = rsp->data; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 9ce2e80951..37e061126b 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -509,6 +509,27 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b } + +// Note: no more error processing, handled in function internal +int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { + // if already FAILED, no more processing + SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_FAILED)); + + if (errCode) { + atomic_store_32(&pJob->errCode, errCode); + } + + if (atomic_load_8(&pJob->userFetch) || ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule)) { + tsem_post(&pJob->rspSem); + } + + SCH_ERR_RET(atomic_load_32(&pJob->errCode)); + + assert(0); +} + + + // Note: no more error processing, handled in function internal int32_t schFetchFromRemote(SSchJob *pJob) { int32_t code = 0; @@ -540,25 +561,6 @@ _return: } -// Note: no more error processing, handled in function internal -int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { - // if already FAILED, no more processing - SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_FAILED)); - - if (errCode) { - atomic_store_32(&pJob->errCode, errCode); - } - - if (atomic_load_8(&pJob->userFetch) || ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule)) { - tsem_post(&pJob->rspSem); - } - - SCH_ERR_RET(atomic_load_32(&pJob->errCode)); - - assert(0); -} - - // Note: no more error processing, handled in function internal int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { int32_t code = 0; @@ -760,8 +762,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch break; } - break; - } case TDMT_VND_QUERY_RSP: { SQueryTableRsp *rsp = (SQueryTableRsp *)msg; @@ -784,8 +784,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch break; } - break; - } case TDMT_VND_FETCH_RSP: { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; @@ -1316,7 +1314,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru SSchJob *job = NULL; - SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, (void **)&job, true)); + SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, &job, true)); *pJob = job; @@ -1333,7 +1331,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, SSchJob *job = NULL; - SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, (void **)&job, false)); + SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, &job, false)); *pJob = job; @@ -1368,11 +1366,11 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { } if (status == JOB_TASK_STATUS_FAILED) { - *data = atomic_load_ptr(&pJob->res); + *pData = atomic_load_ptr(&pJob->res); atomic_store_ptr(&pJob->res, NULL); SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); } else if (status == JOB_TASK_STATUS_SUCCEED) { - *data = atomic_load_ptr(&pJob->res); + *pData = atomic_load_ptr(&pJob->res); atomic_store_ptr(&pJob->res, NULL); goto _return; } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { @@ -1392,9 +1390,9 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { } while (true) { - *data = atomic_load_ptr(&pJob->res); + *pData = atomic_load_ptr(&pJob->res); - if (*data != atomic_val_compare_exchange_ptr(&pJob->res, *data, NULL)) { + if (*pData != atomic_val_compare_exchange_ptr(&pJob->res, *pData, NULL)) { continue; } @@ -1472,17 +1470,21 @@ void scheduleFreeJob(void *job) { taosArrayDestroy(pLevel->subTasks); } - taosHashCleanup(pJob->execTasks); - taosHashCleanup(pJob->failTasks); - taosHashCleanup(pJob->succTasks); - - taosArrayDestroy(pJob->levels); + taosHashCleanup(pJob->execTasks); + taosHashCleanup(pJob->failTasks); + taosHashCleanup(pJob->succTasks); - tfree(pJob->res); - - tfree(pJob); - } + taosArrayDestroy(pJob->levels); + + tfree(pJob->res); - void schedulerDestroy(void) { - if (schMgmt.jobs) { - taosHashCleanup(sch + tfree(pJob); +} + +void schedulerDestroy(void) { + if (schMgmt.jobs) { + taosHashCleanup(schMgmt.jobs); //TODO + schMgmt.jobs = NULL; + } +} + -- GitLab