From f1e37cde780b182fce99a22be55c617fae7eb403 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 15 Jul 2022 10:21:47 +0800 Subject: [PATCH] fix: fix msg disorder issue --- source/libs/qworker/inc/qwInt.h | 2 +- source/libs/qworker/src/qwDbg.c | 2 +- source/libs/qworker/src/qwMsg.c | 3 +-- source/libs/qworker/src/qworker.c | 38 +++++++++++-------------------- 4 files changed, 16 insertions(+), 29 deletions(-) diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 539643c390..6c9871425b 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -135,7 +135,6 @@ typedef struct SQWTaskCtx { int32_t execId; bool queryRsped; - bool queryFetched; bool queryEnd; bool queryContinue; bool queryInQueue; @@ -228,6 +227,7 @@ typedef struct SQWorkerMgmt { #define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED) #define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase) +#define QW_SET_PHASE(ctx, _value) do { if ((_value) != QW_PHASE_PRE_FETCH && (_value) != QW_PHASE_POST_FETCH) { atomic_store_8(&(ctx)->phase, _value); } } while (0) #define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code) #define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code) diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 869eedf8f6..66a89cf57a 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -166,7 +166,7 @@ int32_t qwDbgResponseREdirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) { } if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType) { - ctx->phase = QW_PHASE_POST_QUERY; + QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY); qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL); gQWDebug.tmp = false; return TSDB_CODE_SUCCESS; diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 73110472f7..e1f16b9719 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -374,8 +374,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int qwMsg.msgInfo.needFetch = msg->needFetch; char * sql = strndup(msg->msg, msg->sqlLen); - QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, sql:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql); - + QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql); QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql)); QW_SCH_TASK_DLOG("processQuery end, node:%p", node); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 3e8ced318c..fb8ce09615 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -293,11 +293,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_LOCK(QW_WRITE, &ctx->lock); - if (QW_PHASE_PRE_FETCH == phase) { - atomic_store_8((int8_t *)&ctx->queryFetched, true); - } else { - atomic_store_8(&ctx->phase, phase); - } + QW_SET_PHASE(ctx, phase); if (atomic_load_8((int8_t *)&ctx->queryEnd)) { QW_TASK_ELOG_E("query already end"); @@ -370,6 +366,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu } _return: + if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); @@ -390,7 +387,6 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp int32_t code = 0; SQWTaskCtx *ctx = NULL; SRpcHandleInfo connInfo = {0}; - SRpcHandleInfo *rspConnection = NULL; QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); @@ -403,13 +399,6 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } - if (QW_PHASE_POST_QUERY == phase) { - connInfo = ctx->ctrlConnInfo; - rspConnection = &connInfo; - - ctx->queryRsped = true; - } - if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_PHASE_POST_FETCH == phase) { QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase)); @@ -437,17 +426,16 @@ _return: qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC); } - if (rspConnection) { - qwBuildAndSendQueryRsp(input->msgType + 1, rspConnection, code, ctx); - QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", rspConnection->handle, code, tstrerror(code)); + if (QW_PHASE_POST_QUERY == phase) { + ctx->queryRsped = true; + qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx); + QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); } if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); - if (QW_PHASE_POST_FETCH != phase) { - atomic_store_8(&ctx->phase, phase); - } + QW_SET_PHASE(ctx, phase); QW_UNLOCK(QW_WRITE, &ctx->lock); qwReleaseTaskCtx(mgmt, ctx); @@ -634,8 +622,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_LOCK(QW_WRITE, &ctx->lock); if (queryEnd || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { - // Note: if necessary, fetch need to put cquery to queue again - atomic_store_8(&ctx->phase, 0); + // Note: query is not running anymore + QW_SET_PHASE(ctx, 0); QW_UNLOCK(QW_WRITE, &ctx->lock); break; } @@ -722,7 +710,7 @@ _return: int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; - bool rsped = false; + bool dropped = false; SQWTaskCtx *ctx = NULL; bool locked = false; @@ -740,14 +728,14 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (QW_QUERY_RUNNING(ctx)) { QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP); - } else if (ctx->phase > 0) { + } else if (QW_GET_PHASE(ctx) > 0) { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); - rsped = true; + dropped = true; } else { // task not started } - if (!rsped) { + if (!dropped) { ctx->ctrlConnInfo = qwMsg->connInfo; QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); -- GitLab