提交 f1e37cde 编写于 作者: D dapan1121

fix: fix msg disorder issue

上级 865eb8ef
...@@ -135,7 +135,6 @@ typedef struct SQWTaskCtx { ...@@ -135,7 +135,6 @@ typedef struct SQWTaskCtx {
int32_t execId; int32_t execId;
bool queryRsped; bool queryRsped;
bool queryFetched;
bool queryEnd; bool queryEnd;
bool queryContinue; bool queryContinue;
bool queryInQueue; bool queryInQueue;
...@@ -228,6 +227,7 @@ typedef struct SQWorkerMgmt { ...@@ -228,6 +227,7 @@ typedef struct SQWorkerMgmt {
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED) #define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
#define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase) #define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase)
#define QW_SET_PHASE(ctx, _value) do { if ((_value) != QW_PHASE_PRE_FETCH && (_value) != QW_PHASE_POST_FETCH) { atomic_store_8(&(ctx)->phase, _value); } } while (0)
#define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code) #define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code)
#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code) #define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
......
...@@ -166,7 +166,7 @@ int32_t qwDbgResponseREdirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) { ...@@ -166,7 +166,7 @@ int32_t qwDbgResponseREdirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) {
} }
if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType) { if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType) {
ctx->phase = QW_PHASE_POST_QUERY; QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY);
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL); qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL);
gQWDebug.tmp = false; gQWDebug.tmp = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -374,8 +374,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int ...@@ -374,8 +374,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
qwMsg.msgInfo.needFetch = msg->needFetch; qwMsg.msgInfo.needFetch = msg->needFetch;
char * sql = strndup(msg->msg, msg->sqlLen); char * sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, sql:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql); QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql);
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql)); QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql));
QW_SCH_TASK_DLOG("processQuery end, node:%p", node); QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
......
...@@ -293,11 +293,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -293,11 +293,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
if (QW_PHASE_PRE_FETCH == phase) { QW_SET_PHASE(ctx, phase);
atomic_store_8((int8_t *)&ctx->queryFetched, true);
} else {
atomic_store_8(&ctx->phase, phase);
}
if (atomic_load_8((int8_t *)&ctx->queryEnd)) { if (atomic_load_8((int8_t *)&ctx->queryEnd)) {
QW_TASK_ELOG_E("query already end"); QW_TASK_ELOG_E("query already end");
...@@ -370,6 +366,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -370,6 +366,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
} }
_return: _return:
if (ctx) { if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code); QW_UPDATE_RSP_CODE(ctx, code);
...@@ -390,7 +387,6 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp ...@@ -390,7 +387,6 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
int32_t code = 0; int32_t code = 0;
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
SRpcHandleInfo connInfo = {0}; SRpcHandleInfo connInfo = {0};
SRpcHandleInfo *rspConnection = NULL;
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
...@@ -403,13 +399,6 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp ...@@ -403,13 +399,6 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
} }
if (QW_PHASE_POST_QUERY == phase) {
connInfo = ctx->ctrlConnInfo;
rspConnection = &connInfo;
ctx->queryRsped = true;
}
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
if (QW_PHASE_POST_FETCH == phase) { if (QW_PHASE_POST_FETCH == phase) {
QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase)); QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
...@@ -437,17 +426,16 @@ _return: ...@@ -437,17 +426,16 @@ _return:
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC);
} }
if (rspConnection) { if (QW_PHASE_POST_QUERY == phase) {
qwBuildAndSendQueryRsp(input->msgType + 1, rspConnection, code, ctx); ctx->queryRsped = true;
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", rspConnection->handle, code, tstrerror(code)); qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
} }
if (ctx) { if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code); QW_UPDATE_RSP_CODE(ctx, code);
if (QW_PHASE_POST_FETCH != phase) { QW_SET_PHASE(ctx, phase);
atomic_store_8(&ctx->phase, phase);
}
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
qwReleaseTaskCtx(mgmt, ctx); qwReleaseTaskCtx(mgmt, ctx);
...@@ -634,8 +622,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -634,8 +622,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
if (queryEnd || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { if (queryEnd || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
// Note: if necessary, fetch need to put cquery to queue again // Note: query is not running anymore
atomic_store_8(&ctx->phase, 0); QW_SET_PHASE(ctx, 0);
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
break; break;
} }
...@@ -722,7 +710,7 @@ _return: ...@@ -722,7 +710,7 @@ _return:
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0; int32_t code = 0;
bool rsped = false; bool dropped = false;
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
bool locked = false; bool locked = false;
...@@ -740,14 +728,14 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -740,14 +728,14 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (QW_QUERY_RUNNING(ctx)) { if (QW_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
} else if (ctx->phase > 0) { } else if (QW_GET_PHASE(ctx) > 0) {
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
rsped = true; dropped = true;
} else { } else {
// task not started // task not started
} }
if (!rsped) { if (!dropped) {
ctx->ctrlConnInfo = qwMsg->connInfo; ctx->ctrlConnInfo = qwMsg->connInfo;
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册