diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 5f9b33f7e301b326c019eb6e1fdf83c85e455355..9a332684722b40926739eb557a83def16caea289 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -83,7 +83,7 @@ typedef struct SQWMsg { } SQWMsg; typedef struct SQWPhaseInput { - int8_t status; + int8_t taskStatus; int8_t taskType; int32_t code; qTaskInfo_t taskHandle; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 566356e25595b75cb2e58de8d3fc72a858ddf99e..cedb3fa926b8f3b31236f082b890107673f9434e 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -240,14 +240,14 @@ void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) { } -int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) { +int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) { char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); - QW_LOCK(rwType, &mgmt->ctxLock); - *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); + //QW_LOCK(rwType, &mgmt->ctxLock); + *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id)); if (NULL == (*ctx)) { - QW_UNLOCK(rwType, &mgmt->ctxLock); + //QW_UNLOCK(rwType, &mgmt->ctxLock); QW_TASK_ELOG("ctx not in ctxHash, id:%s", id); QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST); } @@ -268,19 +268,19 @@ int32_t qwGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tI return TSDB_CODE_SUCCESS; } -int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, int32_t status, SQWTaskCtx **ctx) { +int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, bool acquire, int32_t status, SQWTaskCtx **ctx) { char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); SQWTaskCtx nctx = {0}; - QW_LOCK(QW_WRITE, &mgmt->ctxLock); + //QW_LOCK(QW_WRITE, &mgmt->ctxLock); int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &nctx, sizeof(SQWTaskCtx)); if (0 != code) { - QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); + //QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); if (HASH_NODE_EXIST(code)) { - if (rwType && ctx) { + if (acquire && ctx) { QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), rwType, ctx)); } else if (ctx) { QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx)); @@ -293,9 +293,9 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } - QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); + //QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); - if (rwType && ctx) { + if (acquire && ctx) { QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), rwType, ctx)); } else if (ctx) { QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx)); @@ -305,22 +305,23 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ } int32_t qwAddTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { - QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), 0, 0, NULL)); + QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, 0, NULL)); } -int32_t qwAddAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) { - return qwAddTaskCtxImpl(QW_FPARAMS(), rwType, 0, ctx); +int32_t qwAddAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) { + return qwAddTaskCtxImpl(QW_FPARAMS(), true, 0, ctx); } int32_t qwAddGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) { - return qwAddTaskCtxImpl(QW_FPARAMS(), 0, 0, ctx); + return qwAddTaskCtxImpl(QW_FPARAMS(), false, 0, ctx); } -void qwReleaseTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt) { - QW_UNLOCK(rwType, &mgmt->ctxLock); +void qwReleaseTaskCtx(SQWorkerMgmt *mgmt, void *ctx) { + //QW_UNLOCK(rwType, &mgmt->ctxLock); + taosHashRelease(mgmt->ctxHash, ctx); } void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) { @@ -355,7 +356,7 @@ void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { // Note: NEED CTX HASH LOCKED BEFORE ENTRANCE -int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { +int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType) { char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); SQWTaskCtx octx; @@ -367,6 +368,13 @@ int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t octx = *ctx; + atomic_store_ptr(&ctx->taskHandle, NULL); + atomic_store_ptr(&ctx->sinkHandle, NULL); + + if (rwType) { + QW_UNLOCK(rwType, &ctx->lock); + } + if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) { QW_TASK_ELOG_E("taosHashRemove from ctx hash failed"); QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST); @@ -439,7 +447,7 @@ _return: QW_RET(code); } -int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkHandle, int8_t taskType) { +int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkHandle, int8_t taskType, bool execOnce) { int32_t code = 0; bool qcontinue = true; SSDataBlock* pRes = NULL; @@ -463,6 +471,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkH if (TASK_TYPE_TEMP == taskType) { qwFreeTaskHandle(QW_FPARAMS(), taskHandle); } + break; } @@ -475,7 +484,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkH QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue); - if (!qcontinue) { + if (execOnce || (!qcontinue)) { break; } } @@ -573,7 +582,6 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ int8_t status = 0; SQWTaskCtx *ctx = NULL; bool locked = false; - bool ctxAcquired = false; void *readyConnection = NULL; void *dropConnection = NULL; void *cancelConnection = NULL; @@ -582,7 +590,9 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ switch (phase) { case QW_PHASE_PRE_QUERY: { - QW_ERR_JRET(qwAddGetTaskCtx(QW_FPARAMS(), &ctx)); + QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx)); + + QW_LOCK(QW_WRITE, &ctx->lock); atomic_store_32(&ctx->phase, phase); atomic_store_8(&ctx->taskType, input->taskType); @@ -597,7 +607,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); - QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS())); + QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; @@ -621,7 +631,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ break; } case QW_PHASE_POST_QUERY: { - QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); + QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); QW_LOCK(QW_WRITE, &ctx->lock); @@ -641,8 +651,8 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { output->needStop = true; - QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); - QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS())); + QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); + QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; dropConnection = ctx->dropConnection; @@ -665,13 +675,12 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ } if (!output->needStop) { - QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), input->status)); + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), input->taskStatus)); } break; } case QW_PHASE_PRE_FETCH: { QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); - ctxAcquired = true; QW_LOCK(QW_WRITE, &ctx->lock); @@ -686,6 +695,13 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); } + if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { + QW_TASK_WLOG("task already dropped, phase:%d", phase); + output->needStop = true; + output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; + QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + } + if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_TASK_ELOG("drop event at wrong phase, phase:%d", phase); output->needStop = true; @@ -721,7 +737,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ break; } case QW_PHASE_POST_FETCH: { - QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); + QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); QW_LOCK(QW_WRITE, &ctx->lock); @@ -743,7 +759,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ output->needStop = true; QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); - QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS())); + QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; dropConnection = ctx->dropConnection; @@ -772,7 +788,6 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ } case QW_PHASE_PRE_CQUERY: { QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); - ctxAcquired = true; QW_LOCK(QW_WRITE, &ctx->lock); @@ -808,7 +823,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ break; } case QW_PHASE_POST_CQUERY: { - QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); + QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); QW_LOCK(QW_WRITE, &ctx->lock); @@ -829,8 +844,8 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ QW_TASK_WLOG("start to drop task, phase:%d", phase); output->needStop = true; - QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); - QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS())); + QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); + QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; dropConnection = ctx->dropConnection; @@ -871,8 +886,8 @@ _return: QW_UNLOCK(QW_WRITE, &ctx->lock); } - if (ctxAcquired && ctx) { - qwReleaseTaskCtx(QW_READ, mgmt); + if (ctx) { + qwReleaseTaskCtx(mgmt, ctx); } if (readyConnection) { @@ -896,7 +911,7 @@ _return: } -int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg, int8_t taskType) { +int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { int32_t code = 0; bool queryRsped = false; bool needStop = false; @@ -944,7 +959,7 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t queryRsped = true; if (pTaskInfo && sinkHandle) { - QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &pTaskInfo, sinkHandle, taskType)); + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &pTaskInfo, sinkHandle, taskType, true)); } _return: @@ -954,14 +969,8 @@ _return: } if (!queryRsped) { - code = qwBuildAndSendQueryRsp(qwMsg->connection, rspCode); - if (TSDB_CODE_SUCCESS == code) { - QW_TASK_DLOG("query msg rsped, code:%d", rspCode); - } - - if (TSDB_CODE_SUCCESS == rspCode && code) { - rspCode = code; - } + qwBuildAndSendQueryRsp(qwMsg->connection, rspCode); + QW_TASK_DLOG("query msg rsped, code:%x", rspCode); } if (needStop) { @@ -971,12 +980,7 @@ _return: input.code = rspCode; input.taskHandle = pTaskInfo; input.sinkHandle = sinkHandle; - - if (TSDB_CODE_SUCCESS != rspCode) { - input.status = JOB_TASK_STATUS_FAILED; - } else { - input.status = JOB_TASK_STATUS_PARTIAL_SUCCEED; - } + input.taskStatus = rspCode ? JOB_TASK_STATUS_FAILED : JOB_TASK_STATUS_PARTIAL_SUCCEED; QW_ERR_RET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, &output)); @@ -989,8 +993,8 @@ int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t int8_t phase = 0; bool needRsp = false; int32_t rspCode = 0; - - QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); + + QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); QW_LOCK(QW_WRITE, &ctx->lock); @@ -1020,6 +1024,7 @@ _return: if (ctx) { QW_UNLOCK(QW_WRITE, &ctx->lock); + qwReleaseTaskCtx(mgmt, ctx); } if (needRsp) { @@ -1059,7 +1064,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t DataSinkHandle sinkHandle = ctx->sinkHandle; - QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx->taskHandle, sinkHandle, ctx->taskType)); + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx->taskHandle, sinkHandle, ctx->taskType, QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH))); if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { SOutputData sOutput = {0}; @@ -1191,7 +1196,7 @@ int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t SQWTaskCtx *ctx = NULL; bool locked = false; - QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_WRITE, &ctx)); + QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx)); QW_LOCK(QW_WRITE, &ctx->lock); @@ -1210,7 +1215,7 @@ int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t ctx->dropConnection = qwMsg->connection; } else if (ctx->phase > 0) { QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); - QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS())); + QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); locked = false; needRsp = true; @@ -1226,12 +1231,8 @@ _return: QW_SET_RSP_CODE(ctx, code); } - if (locked) { - QW_UNLOCK(QW_WRITE, &ctx->lock); - } - if (ctx) { - qwReleaseTaskCtx(QW_WRITE, mgmt); + qwReleaseTaskCtx(mgmt, ctx); } if (TSDB_CODE_SUCCESS != code || needRsp) { diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index feb8fd645efda3387d9de1661591a9cfb52abdec..0b1200745bc2983d92dc81eb98ee69cc3ab4e548 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -273,7 +273,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { - QW_ELOG("invalid query msg, contLen:%d", pMsg->contLen); + QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -307,7 +307,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { - QW_ELOG("invalid cquery msg, contLen:%d", pMsg->contLen); + QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -337,7 +337,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ SResReadyReq *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { - qError("invalid task status msg"); + QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -398,6 +398,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { + QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); }