未验证 提交 bf3a2de6 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #20003 from taosdata/fix/TD-22457

fix: rsp msg async handled issue
...@@ -206,6 +206,8 @@ typedef struct SQWorkerMgmt { ...@@ -206,6 +206,8 @@ typedef struct SQWorkerMgmt {
int32_t paramIdx; int32_t paramIdx;
} SQWorkerMgmt; } SQWorkerMgmt;
#define QW_CTX_NOT_EXISTS_ERR_CODE(mgmt) (atomic_load_8(&(mgmt)->nodeStopped) ? TSDB_CODE_VND_STOPPED : TSDB_CODE_QRY_TASK_CTX_NOT_EXIST)
#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId #define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId
#define QW_IDS() sId, qId, tId, rId, eId #define QW_IDS() sId, qId, tId, rId, eId
#define QW_FPARAMS() mgmt, QW_IDS() #define QW_FPARAMS() mgmt, QW_IDS()
......
...@@ -213,15 +213,9 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { ...@@ -213,15 +213,9 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
QW_SET_QTID(id, qId, tId, eId); QW_SET_QTID(id, qId, tId, eId);
*ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id)); *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
int8_t nodeStopped = atomic_load_8(&mgmt->nodeStopped);
if (NULL == (*ctx)) { if (NULL == (*ctx)) {
if (!nodeStopped) { QW_TASK_DLOG_E("acquired task ctx not exist, may be dropped");
QW_TASK_DLOG_E("task ctx not exist, may be dropped"); QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt));
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
} else {
QW_TASK_DLOG_E("node stopped");
QW_ERR_RET(TSDB_CODE_VND_STOPPED);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -232,16 +226,9 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { ...@@ -232,16 +226,9 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
QW_SET_QTID(id, qId, tId, eId); QW_SET_QTID(id, qId, tId, eId);
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
int8_t nodeStopped = atomic_load_8(&mgmt->nodeStopped);
if (NULL == (*ctx)) { if (NULL == (*ctx)) {
if (!nodeStopped) { QW_TASK_DLOG_E("get task ctx not exist, may be dropped");
QW_TASK_DLOG_E("task ctx not exist, may be dropped"); QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt));
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
} else {
QW_TASK_DLOG_E("node stopped");
QW_ERR_RET(TSDB_CODE_VND_STOPPED);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -334,7 +321,8 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { ...@@ -334,7 +321,8 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == ctx) { if (NULL == ctx) {
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); QW_TASK_DLOG_E("drop task ctx not exist, may be dropped");
QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt));
} }
octx = *ctx; octx = *ctx;
...@@ -346,7 +334,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { ...@@ -346,7 +334,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) { if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
QW_TASK_ELOG_E("taosHashRemove from ctx hash failed"); QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt));
} }
qwFreeTaskCtx(&octx); qwFreeTaskCtx(&octx);
......
...@@ -230,6 +230,7 @@ typedef struct SSchTask { ...@@ -230,6 +230,7 @@ typedef struct SSchTask {
SSchRedirectCtx redirectCtx; // task redirect context SSchRedirectCtx redirectCtx; // task redirect context
bool waitRetry; // wait for retry bool waitRetry; // wait for retry
int32_t execId; // task current execute index int32_t execId; // task current execute index
int32_t failedExecId; // last failed task execute index
SSchLevel *level; // level SSchLevel *level; // level
SRWLatch planLock; // task update plan lock SRWLatch planLock; // task update plan lock
SSubplan *plan; // subplan SSubplan *plan; // subplan
......
...@@ -34,12 +34,12 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { ...@@ -34,12 +34,12 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
if (lastMsgType != reqMsgType) { if (lastMsgType != reqMsgType) {
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
TMSG_INFO(msgType)); TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
} }
if (taskStatus != JOB_TASK_STATUS_PART_SUCC) { if (taskStatus != JOB_TASK_STATUS_PART_SUCC) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType)); TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -60,13 +60,13 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { ...@@ -60,13 +60,13 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
if (lastMsgType != reqMsgType) { if (lastMsgType != reqMsgType) {
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
TMSG_INFO(msgType)); TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
} }
if (taskStatus != JOB_TASK_STATUS_EXEC) { if (taskStatus != JOB_TASK_STATUS_EXEC) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType)); TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -64,6 +64,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * ...@@ -64,6 +64,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
pTask->plan = pPlan; pTask->plan = pPlan;
pTask->level = pLevel; pTask->level = pLevel;
pTask->execId = -1; pTask->execId = -1;
pTask->failedExecId = -2;
pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC; pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
pTask->taskId = schGenTaskId(); pTask->taskId = schGenTaskId();
...@@ -166,7 +167,7 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v ...@@ -166,7 +167,7 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v
schUpdateTaskExecNode(pJob, pTask, handle, execId); schUpdateTaskExecNode(pJob, pTask, handle, execId);
if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it if ((execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId,
pTask->execId, pTask->waitRetry); pTask->execId, pTask->waitRetry);
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
...@@ -182,6 +183,8 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) ...@@ -182,6 +183,8 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
return TSDB_CODE_SCH_IGNORE_ERROR; return TSDB_CODE_SCH_IGNORE_ERROR;
} }
pTask->failedExecId = pTask->execId;
int8_t jobStatus = 0; int8_t jobStatus = 0;
if (schJobNeedToStop(pJob, &jobStatus)) { if (schJobNeedToStop(pJob, &jobStatus)) {
SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus)); SCH_TASK_DLOG("no more task failure processing cause of job status %s", jobTaskStatusStr(jobStatus));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册