提交 dce0f06e 编写于 作者: D dapan1121

fix: fix redirect issue

上级 e55f01df
......@@ -42,6 +42,7 @@
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "qworker.h"
static void *mndBuildTimerMsg(int32_t *pContLen) {
SMTimerReq timerReq = {0};
......@@ -375,6 +376,7 @@ void mndPreClose(SMnode *pMnode) {
void mndClose(SMnode *pMnode) {
if (pMnode != NULL) {
mDebug("start to close mnode");
qWorkerDestroy((void **)&pMnode->pQuery);
mndCleanupSteps(pMnode, -1);
taosMemoryFreeClear(pMnode->path);
taosMemoryFreeClear(pMnode);
......
......@@ -132,7 +132,7 @@ typedef struct SQWTaskCtx {
int8_t taskType;
int8_t explain;
int8_t needFetch;
int32_t queryType;
int32_t msgType;
int32_t fetchType;
int32_t execId;
......@@ -380,7 +380,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt);
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);
int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet);
int32_t qwAddTaskCtx(QW_FPARAMS_DEF);
int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx);
int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped);
#ifdef __cplusplus
......
......@@ -147,7 +147,7 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int
return TSDB_CODE_SUCCESS;
}
int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) {
int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) {
if (gQWDebug.tmp) {
if (TDMT_SCH_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
SEpSet epSet = {0};
......@@ -162,16 +162,25 @@ int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) {
ctx->phase = QW_PHASE_POST_QUERY;
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet);
*rsped = true;
return TSDB_CODE_SUCCESS;
}
if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY);
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL);
*rsped = true;
return TSDB_CODE_SUCCESS;
}
if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 3)) {
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL);
*rsped = true;
return TSDB_CODE_SUCCESS;
}
}
*rsped = false;
return TSDB_CODE_SUCCESS;
}
......
......@@ -428,7 +428,14 @@ _return:
if (QW_PHASE_POST_QUERY == phase && ctx) {
ctx->queryRsped = true;
qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx);
bool rsped = false;
SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo};
qwDbgResponseRedirect(&qwMsg, ctx, &rsped);
if (!rsped) {
qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx);
}
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
}
......@@ -476,8 +483,6 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
qwDbgResponseRedirect(qwMsg, ctx);
_return:
if (ctx) {
......@@ -505,7 +510,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) {
ctx->taskType = qwMsg->msgInfo.taskType;
ctx->explain = qwMsg->msgInfo.explain;
ctx->needFetch = qwMsg->msgInfo.needFetch;
ctx->queryType = qwMsg->msgType;
ctx->msgType = qwMsg->msgType;
//QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
......@@ -650,7 +655,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
ctx->queryType = qwMsg->msgType;
ctx->msgType = qwMsg->msgType;
SOutputData sOutput = {0};
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
......@@ -702,7 +707,11 @@ _return:
}
if (code || rsp) {
qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
bool rsped = false;
qwDbgResponseRedirect(qwMsg, ctx, &rsped);
if (!rsped) {
qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
}
QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code),
dataLen);
}
......
......@@ -138,7 +138,17 @@ int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int3
return TSDB_CODE_SUCCESS;
}
if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
return TSDB_CODE_SUCCESS;
}
SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
if (NULL == nodeInfo) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
return TSDB_CODE_SUCCESS;
}
nodeInfo->handle = handle;
SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册