From b1980e8f44f71d51c7c4184693878ad06f3ab2a9 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 13 Jan 2022 17:30:37 +0800 Subject: [PATCH] feature/qnode --- source/libs/qworker/inc/qworkerInt.h | 19 +-- source/libs/qworker/src/qworker.c | 203 ++++++++++++++++----------- 2 files changed, 130 insertions(+), 92 deletions(-) diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 2f47c79065..4030ad82ad 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -70,6 +70,7 @@ typedef struct SQWTaskCtx { SRWLatch lock; int8_t sinkScheduled; int8_t queryScheduled; + bool needRsp; qTaskInfo_t taskHandle; DataSinkHandle sinkHandle; @@ -99,7 +100,7 @@ typedef struct SQWorkerMgmt { #define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code)) #define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code)) -#define QW_TASK_READY_RESP(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED) +#define QW_TASK_READY(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED) #define QW_SET_QTID(id, qId, tId) do { *(uint64_t *)(id) = (qId); *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); } while (0) #define QW_GET_QTID(id, qId, tId) do { (qId) = *(uint64_t *)(id); (tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); } while (0) #define QW_IDS() sId, qId, tId @@ -111,16 +112,16 @@ typedef struct SQWorkerMgmt { #define QW_ELOG(param, ...) qError("QW:%p " param, mgmt, __VA_ARGS__) #define QW_DLOG(param, ...) qDebug("QW:%p " param, mgmt, __VA_ARGS__) -#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64 param, mgmt, sId, __VA_ARGS__) -#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64 param, mgmt, sId, __VA_ARGS__) +#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__) +#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__) -#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64 param, mgmt, qId, tId, __VA_ARGS__) -#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64 param, mgmt, qId, tId, __VA_ARGS__) -#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64 param, mgmt, qId, tId, __VA_ARGS__) +#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) +#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) +#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) -#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) -#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) -#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) +#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) +#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) +#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 921298995f..1be190065a 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -221,7 +221,7 @@ int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t code = 0; QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &tsch)); - QW_ERR_JRET(qwAddTaskImpl(mgmt, tsch, 0, qId, tId, JOB_TASK_STATUS_NOT_START, QW_EXIST_RET_ERR, NULL)); + QW_ERR_JRET(qwAddTaskImpl(mgmt, tsch, 0, qId, tId, status, QW_EXIST_RET_ERR, NULL)); _return: @@ -557,6 +557,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ } QW_UNLOCK(QW_WRITE, &task->lock); + qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); @@ -778,30 +779,35 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe return TSDB_CODE_SUCCESS; } -int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) { +int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); - QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)); + QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task)); QW_LOCK(QW_WRITE, &task->lock); if (QW_READY_NOT_RECEIVED == task->ready) { + QW_SCH_TASK_DLOG("ready not received, ready:%d", task->ready); + goto _return; + } else if (QW_READY_RECEIVED == task->ready) { + task->ready = QW_READY_RESPONSED; + int32_t rspCode = task->code; + QW_UNLOCK(QW_WRITE, &task->lock); - qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); - return TSDB_CODE_SUCCESS; - } else if (QW_READY_RECEIVED == task->ready) { - QW_ERR_JRET(qwBuildAndSendReadyRsp(pMsg, rspCode)); + QW_ERR_RET(qwBuildAndSendReadyRsp(pMsg, rspCode)); + + QW_SCH_TASK_DLOG("ready response sent, ready:%d", task->ready); - task->ready = QW_READY_RESPONSED; + return TSDB_CODE_SUCCESS; } else if (QW_READY_RESPONSED == task->ready) { - qError("query response already send"); + QW_SCH_TASK_ELOG("ready response already send, ready:%d", task->ready); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } else { assert(0); @@ -812,7 +818,6 @@ _return: if (task) { QW_UNLOCK(QW_WRITE, &task->lock); qwReleaseTask(QW_READ, sch); - } qwReleaseScheduler(QW_READ, mgmt); @@ -820,34 +825,39 @@ _return: QW_RET(code); } -int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { +int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); - QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)); + QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task)); QW_LOCK(QW_WRITE, &task->lock); - if (QW_TASK_READY_RESP(task->status)) { - QW_ERR_JRET(qwBuildAndSendReadyRsp(pMsg, task->code)); + int8_t status = task->status; + int32_t errCode = task->code; + + if (QW_TASK_READY(status)) { task->ready = QW_READY_RESPONSED; + + QW_UNLOCK(QW_WRITE, &task->lock); + + QW_ERR_JRET(qwBuildAndSendReadyRsp(pMsg, errCode)); + + QW_SCH_TASK_DLOG("task ready responsed, status:%d", status); } else { task->ready = QW_READY_RECEIVED; + QW_UNLOCK(QW_WRITE, &task->lock); - qwReleaseTask(QW_READ, sch); - qwReleaseScheduler(QW_READ, mgmt); - - return TSDB_CODE_SUCCESS; + QW_SCH_TASK_DLOG("task ready NOT responsed, status:%d", status); } _return: if (task) { - QW_UNLOCK(QW_WRITE, &task->lock); qwReleaseTask(QW_READ, sch); } @@ -872,22 +882,15 @@ int32_t qwCheckAndProcessTaskDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId qwReleaseScheduler(QW_READ, mgmt); return TSDB_CODE_SUCCESS; } - - QW_LOCK(QW_READ, &task->lock); - if ((!task->cancel) && (!task->drop)) { - QW_TASK_ELOG("no cancel or drop but task exists, status:%d", task->status); - - QW_UNLOCK(QW_READ, &task->lock); - + if ((!atomic_load_8(&task->cancel)) && (!atomic_load_8(&task->drop))) { + QW_TASK_ELOG("no cancel or drop but task exists, status:%d", atomic_load_8(&task->status)); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - QW_UNLOCK(QW_READ, &task->lock); - *needStop = true; - if (task->cancel) { + if (atomic_load_8(&task->cancel)) { QW_LOCK(QW_WRITE, &task->lock); code = qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()); QW_UNLOCK(QW_WRITE, &task->lock); @@ -929,13 +932,15 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 QW_ERR_RET(code); } + QW_LOCK(QW_WRITE, &task->lock); + if (task->cancel) { - QW_LOCK(QW_WRITE, &task->lock); qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS()); - QW_UNLOCK(QW_WRITE, &task->lock); } if (task->drop) { + QW_UNLOCK(QW_WRITE, &task->lock); + qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); @@ -944,12 +949,12 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 return TSDB_CODE_SUCCESS; } - if ((!(task->cancel || task->drop)) && status > 0) { - QW_LOCK(QW_WRITE, &task->lock); + if (!(task->cancel || task->drop)) { qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()); task->code = errCode; - QW_UNLOCK(QW_WRITE, &task->lock); } + + QW_UNLOCK(QW_WRITE, &task->lock); qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); @@ -995,24 +1000,24 @@ int32_t qwScheduleDataSink(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId return TSDB_CODE_SUCCESS; } -int32_t qwScheduleQuery(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { +int32_t qwScheduleQuery(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) { if (atomic_load_8(&handles->queryScheduled)) { - qDebug("query already scheduled"); + QW_SCH_TASK_ELOG("query already scheduled, queryScheduled:%d", handles->queryScheduled); return TSDB_CODE_SUCCESS; } - QW_ERR_RET(qwUpdateTaskStatus(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_EXECUTING)); + QW_ERR_RET(qwUpdateTaskStatus(mgmt, sId, qId, tId, JOB_TASK_STATUS_EXECUTING)); SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq)); if (NULL == req) { - qError("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq)); + QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq)); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } req->header.vgId = mgmt->nodeId; req->sId = sId; - req->queryId = queryId; - req->taskId = taskId; + req->queryId = qId; + req->taskId = tId; SRpcMsg pNewMsg = { .handle = pMsg->handle, @@ -1025,20 +1030,21 @@ int32_t qwScheduleQuery(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId, u int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg); if (TSDB_CODE_SUCCESS != code) { - qError("put query continue msg to queue failed, code:%x", code); + QW_SCH_TASK_ELOG("put query continue msg to queue failed, code:%x", code); rpcFreeCont(req); QW_ERR_RET(code); } + handles->queryScheduled = true; - qDebug("put query continue msg to query queue"); + QW_SCH_TASK_DLOG("put query continue msg to query queue, vgId:%d", mgmt->nodeId); return TSDB_CODE_SUCCESS; } -int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { +int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; @@ -1049,25 +1055,29 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64 SRetrieveTableRsp *rsp = NULL; bool queryEnd = false; SQWTaskCtx *handles = NULL; + int8_t status = 0; - QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, mgmt, queryId, taskId, &handles)); - if (atomic_load_8(&handles->needRsp)) { - qError("last fetch not responsed"); + QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, mgmt, qId, tId, &handles)); + QW_LOCK(QW_WRITE, &handles->lock); + + if (handles->needRsp) { + QW_UNLOCK(QW_WRITE, &handles->lock); + QW_SCH_TASK_ELOG("last fetch not responsed, needRsp:%d", handles->needRsp); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); - QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)); + QW_UNLOCK(QW_WRITE, &handles->lock); - QW_LOCK(QW_READ, &task->lock); + QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); + QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task)); if (task->cancel || task->drop) { - qError("task is already cancelled or dropped"); + QW_SCH_TASK_ELOG("task is already cancelled or dropped, cancel:%d, drop:%d", task->cancel, task->drop); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED) { - qError("invalid status %d for fetch", task->status); + QW_SCH_TASK_ELOG("invalid status %d for fetch", task->status); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } @@ -1075,6 +1085,9 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64 if (dataLength > 0) { SOutputData output = {0}; + + QW_SCH_TASK_DLOG("task got data in sink, dataLength:%d", dataLength); + QW_ERR_JRET(qwInitFetchRsp(dataLength, &rsp)); output.pData = rsp->data; @@ -1095,27 +1108,38 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64 if (DS_BUF_EMPTY == output.bufStatus && output.queryEnd) { rsp->completed = 1; - QW_ERR_JRET(qwUpdateTaskStatus(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_SUCCEED)); + status = JOB_TASK_STATUS_SUCCEED; + + QW_SCH_TASK_DLOG("task all fetched, status:%d", status); + QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS())); } // Note: schedule data sink firstly and will schedule query after it's done if (output.needSchedule) { - QW_ERR_JRET(qwScheduleDataSink(handles, mgmt, sId, queryId, taskId, pMsg)); - } else if ((!output.queryEnd) && (DS_BUF_LOW == output.bufStatus || DS_BUF_EMPTY == output.bufStatus)) { - QW_ERR_JRET(qwScheduleQuery(handles, mgmt, sId, queryId, taskId, pMsg)); + QW_SCH_TASK_DLOG("sink need schedule, queryEnd:%d", output.queryEnd); + QW_ERR_JRET(qwScheduleDataSink(handles, mgmt, sId, qId, tId, pMsg)); + } else if ((!output.queryEnd) && (DS_BUF_LOW == output.bufStatus || DS_BUF_EMPTY == output.bufStatus)) { + QW_SCH_TASK_DLOG("task not end, need to continue, bufStatus:%d", output.bufStatus); + QW_ERR_JRET(qwScheduleQuery(handles, mgmt, sId, qId, tId, pMsg)); } } else { if (dataLength < 0) { - qError("invalid length from dsGetDataLength, length:%d", dataLength); + QW_SCH_TASK_ELOG("invalid length from dsGetDataLength, length:%d", dataLength); QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } if (queryEnd) { - QW_ERR_JRET(qwUpdateTaskStatus(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_SUCCEED)); + status = JOB_TASK_STATUS_SUCCEED; + + QW_SCH_TASK_DLOG("no data in sink and query end, dataLength:%d", dataLength); + + QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS())); } else { assert(0 == handles->needRsp); + + // MUST IN SCHEDULE OR IN SINK SCHEDULE - qDebug("no res data in sink, need response later"); + QW_SCH_TASK_DLOG("no res data in sink, need response later, queryEnd:%d", queryEnd); QW_LOCK(QW_WRITE, &handles->lock); handles->needRsp = true; @@ -1128,7 +1152,6 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64 _return: if (task) { - QW_UNLOCK(QW_READ, &task->lock); qwReleaseTask(QW_READ, sch); } @@ -1212,10 +1235,10 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; bool queryRsped = false; bool needStop = false; - bool taskAdded = false; struct SSubplan *plan = NULL; SSubQueryMsg *msg = pMsg->pCont; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + int32_t rspCode = 0; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { QW_ELOG("invalid query msg, contLen:%d", pMsg->contLen); @@ -1237,6 +1260,8 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED); QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED); } + + QW_ERR_JRET(qwAddTask(qWorkerMgmt, sId, qId, tId, JOB_TASK_STATUS_EXECUTING)); code = qStringToSubplan(msg->msg, &plan); if (TSDB_CODE_SUCCESS != code) { @@ -1248,53 +1273,49 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { code = qCreateExecTask(node, 0, (struct SSubplan *)plan, &pTaskInfo); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x", code); - QW_ERR_JRET(qwAddTask(qWorkerMgmt, sId, qId, tId, JOB_TASK_STATUS_FAILED)); QW_ERR_JRET(code); } - QW_ERR_JRET(qwAddTask(qWorkerMgmt, sId, qId, tId, JOB_TASK_STATUS_EXECUTING)); - - taskAdded = true; - QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS)); queryRsped = true; DataSinkHandle sinkHandle = NULL; code = qExecTask(pTaskInfo, &sinkHandle); - if (code) { QW_TASK_ELOG("qExecTask failed, code:%x", code); QW_ERR_JRET(code); } QW_ERR_JRET(qwAddTaskHandlesToCache(qWorkerMgmt, msg->queryId, msg->taskId, pTaskInfo, sinkHandle)); - QW_ERR_JRET(qwUpdateTaskStatus(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED)); _return: - if (queryRsped) { - code = qwCheckAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg, code); - } else { - code = qwBuildAndSendQueryRsp(pMsg, code); + if (code) { + rspCode = code; + } + + if (!queryRsped) { + code = qwBuildAndSendQueryRsp(pMsg, rspCode); + if (TSDB_CODE_SUCCESS == rspCode && code) { + rspCode = code; + } } int8_t status = 0; - if (TSDB_CODE_SUCCESS != code) { + if (TSDB_CODE_SUCCESS != rspCode) { status = JOB_TASK_STATUS_FAILED; } else { status = JOB_TASK_STATUS_PARTIAL_SUCCEED; } - if (!taskAdded) { - qwAddTask(qWorkerMgmt, sId, qId, tId, status); - - status = -1; - } + qwQueryPostProcess(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, status, rspCode); - qwQueryPostProcess(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, status, code); + if (queryRsped) { + qwCheckAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg); + } - QW_RET(code); + QW_RET(rspCode); } int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { @@ -1306,19 +1327,27 @@ int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *p SQWTaskCtx *handles = NULL; QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles)); + QW_LOCK(QW_WRITE, &handles->lock); qTaskInfo_t taskHandle = handles->taskHandle; DataSinkHandle sinkHandle = handles->sinkHandle; - bool needRsp = handles->needRsp; + QW_UNLOCK(QW_WRITE, &handles->lock); qwReleaseTaskResCache(QW_READ, qWorkerMgmt); QW_ERR_JRET(qwCheckAndProcessTaskDrop(qWorkerMgmt, req->sId, req->queryId, req->taskId, &needStop)); if (needStop) { qWarn("task need stop"); - if (needRsp) { + + QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles)); + QW_LOCK(QW_WRITE, &handles->lock); + if (handles->needRsp) { qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED); + handles->needRsp = false; } + QW_UNLOCK(QW_WRITE, &handles->lock); + qwReleaseTaskResCache(QW_READ, qWorkerMgmt); + QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED); } @@ -1336,10 +1365,18 @@ int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *p _return: - if (needRsp) { + QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles)); + QW_LOCK(QW_WRITE, &handles->lock); + + if (handles->needRsp) { code = qwBuildAndSendQueryRsp(pMsg, code); + handles->needRsp = false; } - + handles->queryScheduled = false; + + QW_UNLOCK(QW_WRITE, &handles->lock); + qwReleaseTaskResCache(QW_READ, qWorkerMgmt); + if (TSDB_CODE_SUCCESS != code) { status = JOB_TASK_STATUS_FAILED; } else { -- GitLab