提交 75338f80 编写于 作者: D dapan1121

feature/qnode

上级 64d3ab2e
...@@ -254,6 +254,19 @@ int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ ...@@ -254,6 +254,19 @@ int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == (*ctx)) {
QW_TASK_ELOG("ctx not in ctxHash, id:%s", id);
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
}
return TSDB_CODE_SUCCESS;
}
int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, int32_t status, SQWTaskCtx **ctx) { int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, int32_t status, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0}; char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId); QW_SET_QTID(id, qId, tId);
...@@ -294,19 +307,6 @@ int32_t qwAddTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tI ...@@ -294,19 +307,6 @@ int32_t qwAddTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tI
QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), 0, 0, NULL)); QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), 0, 0, NULL));
} }
int32_t qwGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == (*ctx)) {
QW_TASK_ELOG("ctx not in ctxHash, id:%s", id);
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
}
return TSDB_CODE_SUCCESS;
}
int32_t qwAddAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) { int32_t qwAddAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) {
...@@ -322,16 +322,33 @@ void qwReleaseTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt) { ...@@ -322,16 +322,33 @@ void qwReleaseTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt) {
QW_UNLOCK(rwType, &mgmt->ctxLock); QW_UNLOCK(rwType, &mgmt->ctxLock);
} }
void qwFreeTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
// RC WARNING
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
qDestroyTask(taskHandle);
}
}
void qwFreeTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *ctx) { int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
if (ctx->taskHandle) { int32_t code = 0;
qDestroyTask(ctx->taskHandle); // RC WARNING
ctx->taskHandle = NULL; qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
code = qKillTask(taskHandle);
atomic_store_ptr(&ctx->taskHandle, taskHandle);
} }
// TODO QW_RET(code);
if (ctx->sinkHandle) { }
void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
qwFreeTaskHandle(QW_FPARAMS(), ctx);
if (ctx->sinkHandle) {
dsDestroyDataSinker(ctx->sinkHandle);
ctx->sinkHandle = NULL;
} }
} }
...@@ -425,7 +442,7 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, ...@@ -425,7 +442,7 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId,
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
bool locked = false; bool locked = false;
QW_ERR_JRET(qwAddGetTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); QW_ERR_JRET(qwAddGetTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
...@@ -437,9 +454,7 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, ...@@ -437,9 +454,7 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId,
} }
if (QW_IN_EXECUTOR(ctx)) { if (QW_IN_EXECUTOR(ctx)) {
if (ctx->taskHandle) { QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
QW_ERR_JRET(qKillTask(ctx->taskHandle));
}
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING)); QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING));
} else if (ctx->phase > 0) { } else if (ctx->phase > 0) {
...@@ -587,7 +602,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S ...@@ -587,7 +602,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
switch (phase) { switch (phase) {
case QW_PHASE_PRE_QUERY: { case QW_PHASE_PRE_QUERY: {
QW_ERR_JRET(qwAddGetTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); QW_ERR_JRET(qwAddGetTaskCtx(QW_FPARAMS(), &ctx));
ctx->phase = phase; ctx->phase = phase;
...@@ -607,7 +622,6 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S ...@@ -607,7 +622,6 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
output->needStop = true; output->needStop = true;
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
qwFreeTask(QW_FPARAMS(), ctx);
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
...@@ -620,7 +634,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S ...@@ -620,7 +634,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
break; break;
} }
case QW_PHASE_POST_QUERY: { case QW_PHASE_POST_QUERY: {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
...@@ -702,7 +716,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S ...@@ -702,7 +716,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
break; break;
} }
case QW_PHASE_POST_CQUERY: { case QW_PHASE_POST_CQUERY: {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
...@@ -738,7 +752,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S ...@@ -738,7 +752,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
break; break;
} }
case QW_PHASE_PRE_FETCH: { case QW_PHASE_PRE_FETCH: {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
...@@ -786,7 +800,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S ...@@ -786,7 +800,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
break; break;
} }
case QW_PHASE_POST_FETCH: { case QW_PHASE_POST_FETCH: {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
...@@ -983,7 +997,8 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t ...@@ -983,7 +997,8 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", sOutput.bufStatus); QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", sOutput.bufStatus);
// RC WARNING
atomic_store_8(&ctx->queryContinue, 1); atomic_store_8(&ctx->queryContinue, 1);
} }
...@@ -1061,6 +1076,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t ...@@ -1061,6 +1076,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
locked = true; locked = true;
// RC WARNING
if (QW_IN_EXECUTOR(ctx)) { if (QW_IN_EXECUTOR(ctx)) {
atomic_store_8(&ctx->queryContinue, 1); atomic_store_8(&ctx->queryContinue, 1);
} else if (0 == atomic_load_8(&ctx->inQueue)) { } else if (0 == atomic_load_8(&ctx->inQueue)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册