提交 fe9df727 编写于 作者: D dapan1121

fix: fetch rsp message type issue

上级 9caac472
...@@ -120,7 +120,8 @@ typedef struct SQWTaskCtx { ...@@ -120,7 +120,8 @@ typedef struct SQWTaskCtx {
int8_t explain; int8_t explain;
int8_t needFetch; int8_t needFetch;
int8_t localExec; int8_t localExec;
int32_t msgType; int32_t queryMsgType;
int32_t fetchMsgType;
int32_t level; int32_t level;
uint64_t sId; uint64_t sId;
......
...@@ -126,10 +126,10 @@ void qwDbgDumpTasksInfo(SQWorker *mgmt) { ...@@ -126,10 +126,10 @@ void qwDbgDumpTasksInfo(SQWorker *mgmt) {
void *key = taosHashGetKey(pIter, NULL); void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, tId, eId); QW_GET_QTID(key, qId, tId, eId);
QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, msgType:%d, " QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, queryMsgType:%d, "
"sId:%" PRId64 ", level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, " "sId:%" PRId64 ", level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, "
"rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbFName:%s, sver:%d, tver:%d, events:%d,%d,%d,%d,%d", "rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbFName:%s, sver:%d, tver:%d, events:%d,%d,%d,%d,%d",
ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->msgType, ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->queryMsgType,
ctx->sId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue, ctx->sId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue,
ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, ctx->tbInfo.tbFName, ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, ctx->tbInfo.tbFName,
ctx->tbInfo.sversion, ctx->tbInfo.tversion, ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY], ctx->tbInfo.sversion, ctx->tbInfo.tversion, ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY],
...@@ -259,9 +259,9 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) { ...@@ -259,9 +259,9 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) {
static int32_t ignoreTime = 0; static int32_t ignoreTime = 0;
if (++ignoreTime > 10 && 0 == taosRand() % 9) { if (++ignoreTime > 10 && 0 == taosRand() % 9) {
if (ctx->msgType == TDMT_SCH_FETCH) { if (ctx->fetchMsgType == TDMT_SCH_FETCH) {
qwBuildAndSendErrorRsp(TDMT_SCH_LINK_BROKEN, &ctx->ctrlConnInfo, TSDB_CODE_RPC_BROKEN_LINK); qwBuildAndSendErrorRsp(TDMT_SCH_LINK_BROKEN, &ctx->ctrlConnInfo, TSDB_CODE_RPC_BROKEN_LINK);
qwBuildAndSendErrorRsp(ctx->msgType + 1, &ctx->dataConnInfo, TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); qwBuildAndSendErrorRsp(ctx->fetchMsgType + 1, &ctx->dataConnInfo, TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
*rsped = true; *rsped = true;
taosSsleep(3); taosSsleep(3);
......
...@@ -464,7 +464,7 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i ...@@ -464,7 +464,7 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i
qwMsg->connInfo = ctx->dataConnInfo; qwMsg->connInfo = ctx->dataConnInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
rsp = NULL; rsp = NULL;
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
...@@ -626,7 +626,7 @@ _return: ...@@ -626,7 +626,7 @@ _return:
if (QW_PHASE_POST_QUERY == phase && ctx && !ctx->queryRsped) { if (QW_PHASE_POST_QUERY == phase && ctx && !ctx->queryRsped) {
bool rsped = false; bool rsped = false;
SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; SQWMsg qwMsg = {.msgType = ctx->queryMsgType, .connInfo = ctx->ctrlConnInfo};
qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
if (!rsped) { if (!rsped) {
...@@ -704,7 +704,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { ...@@ -704,7 +704,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
ctx->taskType = qwMsg->msgInfo.taskType; ctx->taskType = qwMsg->msgInfo.taskType;
ctx->explain = qwMsg->msgInfo.explain; ctx->explain = qwMsg->msgInfo.explain;
ctx->needFetch = qwMsg->msgInfo.needFetch; ctx->needFetch = qwMsg->msgInfo.needFetch;
ctx->msgType = qwMsg->msgType; ctx->queryMsgType = qwMsg->msgType;
ctx->localExec = false; ctx->localExec = false;
// QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); // QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
...@@ -793,7 +793,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -793,7 +793,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
qwMsg->connInfo = ctx->dataConnInfo; qwMsg->connInfo = ctx->dataConnInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
rsp = NULL; rsp = NULL;
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
...@@ -815,7 +815,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -815,7 +815,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
rsp = NULL; rsp = NULL;
qwMsg->connInfo = ctx->dataConnInfo; qwMsg->connInfo = ctx->dataConnInfo;
qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, NULL, 0, code); qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
0); 0);
} }
...@@ -849,7 +849,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -849,7 +849,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
ctx->msgType = qwMsg->msgType; ctx->fetchMsgType = qwMsg->msgType;
ctx->dataConnInfo = qwMsg->connInfo; ctx->dataConnInfo = qwMsg->connInfo;
SOutputData sOutput = {0}; SOutputData sOutput = {0};
...@@ -1328,7 +1328,7 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64 ...@@ -1328,7 +1328,7 @@ int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64
ctx->taskType = qwMsg->msgInfo.taskType; ctx->taskType = qwMsg->msgInfo.taskType;
ctx->explain = qwMsg->msgInfo.explain; ctx->explain = qwMsg->msgInfo.explain;
ctx->needFetch = qwMsg->msgInfo.needFetch; ctx->needFetch = qwMsg->msgInfo.needFetch;
ctx->msgType = qwMsg->msgType; ctx->queryMsgType = qwMsg->msgType;
ctx->localExec = true; ctx->localExec = true;
ctx->explainRes = explainRes; ctx->explainRes = explainRes;
...@@ -1383,7 +1383,7 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64 ...@@ -1383,7 +1383,7 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
ctx->msgType = TDMT_SCH_MERGE_FETCH; ctx->fetchMsgType = TDMT_SCH_MERGE_FETCH;
ctx->explainRes = explainRes; ctx->explainRes = explainRes;
SOutputData sOutput = {0}; SOutputData sOutput = {0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册