提交 d12614aa 编写于 作者: D dapan1121

kill query

上级 b3f9f81b
...@@ -73,13 +73,14 @@ typedef void (*schedulerExecCallback)(SQueryResult* pResult, void* param, int32_ ...@@ -73,13 +73,14 @@ typedef void (*schedulerExecCallback)(SQueryResult* pResult, void* param, int32_
typedef void (*schedulerFetchCallback)(void* pResult, void* param, int32_t code); typedef void (*schedulerFetchCallback)(void* pResult, void* param, int32_t code);
typedef struct SSchedulerReq { typedef struct SSchedulerReq {
SRequestConnInfo *pConn; bool *reqKilled;
SArray *pNodeList; SRequestConnInfo *pConn;
SQueryPlan *pDag; SArray *pNodeList;
const char *sql; SQueryPlan *pDag;
int64_t startTs; const char *sql;
int64_t startTs;
schedulerExecCallback fp; schedulerExecCallback fp;
void* cbParam; void* cbParam;
} SSchedulerReq; } SSchedulerReq;
......
...@@ -461,7 +461,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList ...@@ -461,7 +461,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
.sql = pRequest->sqlstr, .sql = pRequest->sqlstr,
.startTs = pRequest->metric.start, .startTs = pRequest->metric.start,
.fp = NULL, .fp = NULL,
.cbParam = NULL}; .cbParam = NULL,
.reqKilled = &pRequest->killed};
int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob, &res); int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob, &res);
pRequest->body.resInfo.execRes = res.res; pRequest->body.resInfo.execRes = res.res;
...@@ -738,7 +739,8 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) { ...@@ -738,7 +739,8 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
.sql = pRequest->sqlstr, .sql = pRequest->sqlstr,
.startTs = pRequest->metric.start, .startTs = pRequest->metric.start,
.fp = schedulerExecCb, .fp = schedulerExecCb,
.cbParam = pRequest}; .cbParam = pRequest,
.reqKilled = &pRequest->killed};
code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob); code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
} else { } else {
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code), tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
......
...@@ -228,6 +228,7 @@ typedef struct SSchJob { ...@@ -228,6 +228,7 @@ typedef struct SSchJob {
SQueryNodeAddr resNode; SQueryNodeAddr resNode;
tsem_t rspSem; tsem_t rspSem;
SSchOpStatus opStatus; SSchOpStatus opStatus;
bool *reqKilled;
SSchTask *fetchTask; SSchTask *fetchTask;
int32_t errCode; int32_t errCode;
SRWLatch resLock; SRWLatch resLock;
......
...@@ -54,6 +54,7 @@ int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob, SQueryResult* pRes, b ...@@ -54,6 +54,7 @@ int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob, SQueryResult* pRes, b
pJob->attr.explainMode = pReq->pDag->explainInfo.mode; pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
pJob->conn = *pReq->pConn; pJob->conn = *pReq->pConn;
pJob->sql = pReq->sql; pJob->sql = pReq->sql;
pJob->reqKilled = pReq->reqKilled;
pJob->userRes.queryRes = pRes; pJob->userRes.queryRes = pRes;
pJob->userRes.execFp = pReq->fp; pJob->userRes.execFp = pReq->fp;
pJob->userRes.userParam = pReq->cbParam; pJob->userRes.userParam = pReq->cbParam;
...@@ -154,12 +155,52 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) { ...@@ -154,12 +155,52 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
} }
} }
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
if (TSDB_CODE_SUCCESS == errCode) {
return;
}
int32_t origCode = atomic_load_32(&pJob->errCode);
if (TSDB_CODE_SUCCESS == origCode) {
if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
goto _return;
}
origCode = atomic_load_32(&pJob->errCode);
}
if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
return;
}
if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
atomic_store_32(&pJob->errCode, errCode);
goto _return;
}
return;
_return:
SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode));
}
FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
int8_t status = SCH_GET_JOB_STATUS(pJob); int8_t status = SCH_GET_JOB_STATUS(pJob);
if (pStatus) { if (pStatus) {
*pStatus = status; *pStatus = status;
} }
if (pJob->reqKilled) {
schUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING);
schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED);
return true;
}
return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_DROPPING || return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_DROPPING ||
status == JOB_TASK_STATUS_SUCCEED); status == JOB_TASK_STATUS_SUCCEED);
} }
...@@ -255,7 +296,13 @@ void schEndOperation(SSchJob *pJob) { ...@@ -255,7 +296,13 @@ void schEndOperation(SSchJob *pJob) {
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync) { int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync) {
int32_t code = 0; int32_t code = 0;
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_JOB_ELOG("job need to stop cause of status %s", jobTaskStatusStr(status));
SCH_ERR_JRET(pJob->errCode);
}
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) { if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op)); SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
SCH_ERR_JRET(TSDB_CODE_TSC_APP_ERROR); SCH_ERR_JRET(TSDB_CODE_TSC_APP_ERROR);
...@@ -275,11 +322,7 @@ int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync) { ...@@ -275,11 +322,7 @@ int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
int8_t status = 0; if (status != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
if (schJobNeedToStop(pJob, &status)) {
SCH_JOB_ELOG("job need to stop cause of status %s", jobTaskStatusStr(status));
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
} else if (status != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status)); SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
} }
...@@ -841,37 +884,6 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { ...@@ -841,37 +884,6 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
if (TSDB_CODE_SUCCESS == errCode) {
return;
}
int32_t origCode = atomic_load_32(&pJob->errCode);
if (TSDB_CODE_SUCCESS == origCode) {
if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) {
goto _return;
}
origCode = atomic_load_32(&pJob->errCode);
}
if (NEED_CLIENT_HANDLE_ERROR(origCode)) {
return;
}
if (NEED_CLIENT_HANDLE_ERROR(errCode)) {
atomic_store_32(&pJob->errCode, errCode);
goto _return;
}
return;
_return:
SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode));
}
int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes) { int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes) {
pRes->code = atomic_load_32(&pJob->errCode); pRes->code = atomic_load_32(&pJob->errCode);
pRes->numOfRows = pJob->resNumOfRows; pRes->numOfRows = pJob->resNumOfRows;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册