diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 0bca254e14a9548ff7ac7a0bd7b832bb73271cdd..1413b41756a2728955a54b9eb26bd224e948d47b 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -152,7 +152,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo); * @param tinfo qhandle * @return */ -int32_t qAsyncKillTask(qTaskInfo_t tinfo); +int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode); /** * destroy query info structure diff --git a/include/util/taoserror.h b/include/util/taoserror.h index e5d0bcb249b1d0d76517506421ee95a87d873051..ba7c72e0f3a84b740c1aab0d7b414dfcf70e53b8 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -323,6 +323,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0526) #define TSDB_CODE_VND_COL_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x0527) #define TSDB_CODE_VND_NO_AVAIL_BUFPOOL TAOS_DEF_ERROR_CODE(0, 0x0528) +#define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529) // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 816321703952c6643dff17f0f57617fba798e1ab..e18a6916ddfc1d11ee026315b1a1f86bf1f35b02 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -796,7 +796,7 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t checkForQueryBuf(size_t numOfTables); bool isTaskKilled(SExecTaskInfo* pTaskInfo); -void setTaskKilled(SExecTaskInfo* pTaskInfo); +void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); void doDestroyTask(SExecTaskInfo* pTaskInfo); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 34bd9cf8cac8f91ed6ebd9d8ba96d395c7372062..fd4dd3bd1276a58994a19559e9db83c70782d40c 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -688,7 +688,7 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) { taosWUnLockLatch(&pTaskInfo->stopInfo.lock); } -int32_t qAsyncKillTask(qTaskInfo_t qinfo) { +int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; if (pTaskInfo == NULL) { @@ -697,7 +697,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) { qDebug("%s execTask async killed", GET_TASKID(pTaskInfo)); - setTaskKilled(pTaskInfo); + setTaskKilled(pTaskInfo, rspCode); qStopTaskOperators(pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 300ba529346e4e0b9d5c6c14d378a6995868d529..2766b48744f749b95ed1dc77d0fbeb724bee05d2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -611,21 +611,10 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB } bool isTaskKilled(SExecTaskInfo* pTaskInfo) { - // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived - // abort current query execution. - if (pTaskInfo->owner != 0 && - ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec()) - /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) { - assert(pTaskInfo->cost.start != 0); - // qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64 - // ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec()); - // return true; - } - - return false; + return (0 != pTaskInfo->code) ? true : false; } -void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; } +void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; } ///////////////////////////////////////////////////////////////////////////////////////////// STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) { diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index a0e04b6a19bd16cb98415278ea8748406e1cfa89..af361323a7731f13512af85139d3ccdfd955ebf0 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -363,7 +363,7 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx); -int32_t qwKillTaskHandle(SQWTaskCtx *ctx); +int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode); int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status); int32_t qwDropTask(QW_FPARAMS_DEF); void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 2c0a4072aeb51179b172d8181f220198fa00534b..86fd1d533cb251904428d696b31bfb1e4716fe77 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -279,14 +279,14 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) { } } -int32_t qwKillTaskHandle(SQWTaskCtx *ctx) { +int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) { int32_t code = 0; // Note: free/kill may in RC qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) { qDebug("start to kill task"); - code = qAsyncKillTask(taskHandle); + code = qAsyncKillTask(taskHandle, rspCode); atomic_store_ptr(&ctx->taskHandle, taskHandle); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 0890d10b65728e1dadf84c9aea3640dc090d6aa0..9a318df324704291f83b6a69594c06b8823cd053 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -411,7 +411,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC)); @@ -420,7 +420,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu case QW_PHASE_PRE_FETCH: { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase)); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { @@ -442,7 +442,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu case QW_PHASE_PRE_CQUERY: { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } if (ctx->rspCode) { @@ -456,7 +456,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } break; @@ -502,7 +502,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { @@ -515,7 +515,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); QW_ERR_JRET(qwDropTask(QW_FPARAMS())); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } if (ctx->rspCode) { @@ -861,7 +861,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (QW_QUERY_RUNNING(ctx)) { - QW_ERR_JRET(qwKillTaskHandle(ctx)); + QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED)); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP); } else { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); @@ -869,6 +869,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (!dropped) { + QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_TSC_QUERY_CANCELLED); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); } @@ -1195,8 +1196,9 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { } if (QW_QUERY_RUNNING(ctx)) { - qwKillTaskHandle(ctx); + qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); } else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { + QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); } diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 8a48977c777af8d794a38ed1721319c4b0646953..02b341e28c5684868e32354de199eb0be252b487 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -302,7 +302,7 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) { return 0; } -int32_t qwtKillTask(qTaskInfo_t qinfo) { return 0; } +int32_t qwtKillTask(qTaskInfo_t qinfo, int32_t rspCode) { return 0; } void qwtDestroyTask(qTaskInfo_t qHandle) {} diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 4d889843e8d5812042dc5f85892d71b7d0fb1c21..35f59c4881dfd863292be035eaa043aa11ff3b34 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -315,6 +315,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_NOT_EXISTS, "Table column not exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subscribed") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_AVAIL_BUFPOOL, "No availabe buffer pool") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_STOPPED, "Vnode stopped") // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")