From fc04aa847c122bedbf8fc1abcf073e72699a41d2 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Sat, 4 Jun 2022 09:57:16 +0800 Subject: [PATCH] fix qnode mnode query issue --- source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 1 + source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 2 ++ source/dnode/mgmt/mgmt_qnode/inc/qmInt.h | 2 ++ source/dnode/mgmt/mgmt_qnode/src/qmWorker.c | 2 ++ source/dnode/mnode/impl/src/mndQuery.c | 8 ++++++++ source/dnode/qnode/src/qnode.c | 8 ++++++++ source/libs/qworker/src/qwDbg.c | 3 ++- source/libs/qworker/src/qwMsg.c | 12 ++++++------ source/libs/qworker/src/qworker.c | 4 ++-- source/libs/scheduler/src/schRemote.c | 19 +++++++++++++------ 10 files changed, 46 insertions(+), 15 deletions(-) diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 9a0cfdfc93..22691300bc 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -56,6 +56,7 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg); // mmWorker.c int32_t mmStartWorker(SMnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 460f2242f2..42fa7b718e 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -89,6 +89,8 @@ int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + mndPreprocessQueryMsg(pMgmt->pMnode, pMsg); + return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); } diff --git a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h index 54e9da24a4..acc101386b 100644 --- a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h +++ b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h @@ -51,6 +51,8 @@ int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg * pMsg); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index e36efa83db..6814643b59 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -57,6 +57,8 @@ static int32_t qmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { } int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { + qndPreprocessQueryMsg(pMgmt->pQnode, pMsg); + return qmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); } diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 97594f2b91..12b39e5b78 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -18,6 +18,14 @@ #include "mndMnode.h" #include "qworker.h" +int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg) { + if (TDMT_VND_QUERY != pMsg->msgType) { + return 0; + } + + return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg); +} + int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { int32_t code = -1; SMnode *pMnode = pMsg->info.node; diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 438982ac6a..45b88318c4 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -63,6 +63,14 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; } +int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg * pMsg) { + if (TDMT_VND_QUERY != pMsg->msgType) { + return 0; + } + + return qWorkerPreprocessQueryMsg(pQnode->pQuery, pMsg); +} + int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) { int32_t code = -1; SReadHandle handle = {.pMsgCb = &pQnode->msgCb}; diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 27fe22295d..3914541157 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -36,7 +36,8 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, break; case JOB_TASK_STATUS_NOT_START: - if (newStatus != JOB_TASK_STATUS_CANCELLED) { + if (newStatus != JOB_TASK_STATUS_DROPPING && newStatus != JOB_TASK_STATUS_EXECUTING + && newStatus != JOB_TASK_STATUS_FAILED) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 317877c253..2f1ecc5172 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -300,12 +300,12 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - msg->sId = be64toh(msg->sId); - msg->queryId = be64toh(msg->queryId); - msg->taskId = be64toh(msg->taskId); - msg->refId = be64toh(msg->refId); - msg->phyLen = ntohl(msg->phyLen); - msg->sqlLen = ntohl(msg->sqlLen); + msg->sId = msg->sId; + msg->queryId = msg->queryId; + msg->taskId = msg->taskId; + msg->refId = msg->refId; + msg->phyLen = msg->phyLen; + msg->sqlLen = msg->sqlLen; uint64_t sId = msg->sId; uint64_t qId = msg->queryId; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 9ad36a32c4..8c0366fa0b 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -446,6 +446,8 @@ int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx)); + ctx->ctrlConnInfo = qwMsg->connInfo; + QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_NOT_START)); _return: @@ -475,8 +477,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex atomic_store_8(&ctx->taskType, taskType); atomic_store_8(&ctx->explain, explain); - ctx->ctrlConnInfo = qwMsg->connInfo; - /*QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);*/ code = qStringToSubplan(qwMsg->msg, &plan); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index a940e45b43..88caf7438e 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -774,13 +774,20 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { strcpy(epId.ep.fqdn, pEp->fqdn); epId.ep.port = pEp->port; - SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId)); - if (NULL == hb) { - bool exist = false; - SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist)); - if (!exist) { - SCH_ERR_RET(schBuildAndSendHbMsg(&epId, NULL)); + SSchHbTrans *hb = NULL; + while (true) { + hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId)); + if (NULL == hb) { + bool exist = false; + SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist)); + if (!exist) { + SCH_ERR_RET(schBuildAndSendHbMsg(&epId, NULL)); + } + + continue; } + + break; } atomic_add_fetch_64(&hb->taskNum, 1); -- GitLab