提交 0048bd8c 编写于 作者: D dapan1121

fix: fix redirect issue

上级 53103acc
......@@ -1276,7 +1276,7 @@ int32_t doProcessMsgFromServer(void* param) {
assert(pMsg->info.ahandle != NULL);
STscObj* pTscObj = NULL;
tscDebug("processMsgFromServer message: %s, code: %s", TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code));
tscDebug("processMsgFromServer handle %p, message: %s, code: %s", pMsg->info.handle, TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code));
if (pSendInfo->requestObjRefId != 0) {
SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
......
......@@ -188,8 +188,8 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t msgType) {
return;
}
//FETCH OR QUERY USE DIFFERENT CONNINFO
qwBuildAndSendErrorRsp(msgType + 1, ctx->dataConnInfo, TSDB_CODE_RPC_BROKEN_LINK);
SRpcHandleInfo *pConn = ((msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo : &ctx->ctrlConnInfo);
qwBuildAndSendErrorRsp(msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK);
qwDropTask(QW_FPARAMS());
}
......
......@@ -426,7 +426,7 @@ _return:
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC);
}
if (QW_PHASE_POST_QUERY == phase) {
if (QW_PHASE_POST_QUERY == phase && ctx) {
ctx->queryRsped = true;
qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
......@@ -730,11 +730,9 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (QW_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(ctx));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
} else if (QW_GET_PHASE(ctx) > 0) {
} else {
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropped = true;
} else {
// task not started
}
if (!dropped) {
......
......@@ -58,7 +58,7 @@ typedef enum {
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
#define SCH_MAX_CANDIDATE_EP_NUM (TSDB_MAX_REPLICA + 100)
......@@ -211,6 +211,7 @@ typedef struct SSchTask {
int32_t maxExecTimes; // task max exec times
int32_t maxRetryTimes; // task max retry times
int32_t retryTimes; // task retry times
bool waitRetry; // wait for retry
int32_t execId; // task current execute index
SSchLevel *level; // level
SRWLatch planLock; // task update plan lock
......
......@@ -1010,7 +1010,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
persistHandle = true;
//SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle());
SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle());
break;
}
case TDMT_SCH_FETCH:
......
......@@ -125,8 +125,8 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_
SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
}
if (execId != pTask->execId) { // ignore it
SCH_TASK_DLOG("execId %d is not current execId %d", execId, pTask->execId);
if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it
SCH_TASK_DLOG("execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry);
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
......@@ -335,6 +335,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
return TSDB_CODE_SUCCESS;
}
pTask->waitRetry = true;
schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes);
SCH_ERR_JRET(schRemoveTaskFromExecList(pJob, pTask));
......@@ -790,6 +791,7 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
pTask->execId++;
pTask->retryTimes++;
pTask->waitRetry = false;
SCH_TASK_DLOG("start to launch task, execId %d, retry %d", pTask->execId, pTask->retryTimes);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册