diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 941deb03cd0af0db9f733a9a9d658c2247bff40a..e61123ef913b60214818720eacf256ec3f81d664 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -166,7 +166,7 @@ int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) { } -int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) { +int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType) { SQWSchStatus newSch = {0}; newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (NULL == newSch.tasksHash) { @@ -200,7 +200,7 @@ int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, QW_UNLOCK(rwType, &mgmt->schLock); if (QW_NOT_EXIST_ADD == nOpt) { - QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType, sch)); + QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType)); nOpt = QW_NOT_EXIST_RET_ERR; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index e74507bdf7f79ee90ee80254a412d3a2d7249f83..529f27188e81595236ab48f6a1d2d08205450f8e 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -125,7 +125,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m case TDMT_SCH_LINK_BROKEN: return TSDB_CODE_SUCCESS; case TDMT_VND_QUERY_RSP: // query_rsp may be processed later than ready_rsp - if (lastMsgType != reqMsgType) { + if (lastMsgType != reqMsgType && -1 != lastMsgType && TDMT_VND_FETCH != lastMsgType) { SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); } @@ -1776,6 +1776,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); pMsg->taskId = htobe64(pTask->taskId); + break; } case TDMT_VND_DROP_TASK: {