提交 fc04aa84 编写于 作者: D dapan1121

fix qnode mnode query issue

上级 1b0e1146
...@@ -56,6 +56,7 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); ...@@ -56,6 +56,7 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg);
// mmWorker.c // mmWorker.c
int32_t mmStartWorker(SMnodeMgmt *pMgmt); int32_t mmStartWorker(SMnodeMgmt *pMgmt);
......
...@@ -89,6 +89,8 @@ int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -89,6 +89,8 @@ int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
mndPreprocessQueryMsg(pMgmt->pMnode, pMsg);
return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
} }
......
...@@ -51,6 +51,8 @@ int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); ...@@ -51,6 +51,8 @@ int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg * pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -57,6 +57,8 @@ static int32_t qmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { ...@@ -57,6 +57,8 @@ static int32_t qmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
} }
int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
qndPreprocessQueryMsg(pMgmt->pQnode, pMsg);
return qmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); return qmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
} }
......
...@@ -18,6 +18,14 @@ ...@@ -18,6 +18,14 @@
#include "mndMnode.h" #include "mndMnode.h"
#include "qworker.h" #include "qworker.h"
int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg) {
if (TDMT_VND_QUERY != pMsg->msgType) {
return 0;
}
return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg);
}
int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
int32_t code = -1; int32_t code = -1;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
......
...@@ -63,6 +63,14 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { ...@@ -63,6 +63,14 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) {
return 0; return 0;
} }
int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg * pMsg) {
if (TDMT_VND_QUERY != pMsg->msgType) {
return 0;
}
return qWorkerPreprocessQueryMsg(pQnode->pQuery, pMsg);
}
int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) { int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
int32_t code = -1; int32_t code = -1;
SReadHandle handle = {.pMsgCb = &pQnode->msgCb}; SReadHandle handle = {.pMsgCb = &pQnode->msgCb};
......
...@@ -36,7 +36,8 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, ...@@ -36,7 +36,8 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
break; break;
case JOB_TASK_STATUS_NOT_START: case JOB_TASK_STATUS_NOT_START:
if (newStatus != JOB_TASK_STATUS_CANCELLED) { if (newStatus != JOB_TASK_STATUS_DROPPING && newStatus != JOB_TASK_STATUS_EXECUTING
&& newStatus != JOB_TASK_STATUS_FAILED) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
......
...@@ -300,12 +300,12 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int ...@@ -300,12 +300,12 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
msg->sId = be64toh(msg->sId); msg->sId = msg->sId;
msg->queryId = be64toh(msg->queryId); msg->queryId = msg->queryId;
msg->taskId = be64toh(msg->taskId); msg->taskId = msg->taskId;
msg->refId = be64toh(msg->refId); msg->refId = msg->refId;
msg->phyLen = ntohl(msg->phyLen); msg->phyLen = msg->phyLen;
msg->sqlLen = ntohl(msg->sqlLen); msg->sqlLen = msg->sqlLen;
uint64_t sId = msg->sId; uint64_t sId = msg->sId;
uint64_t qId = msg->queryId; uint64_t qId = msg->queryId;
......
...@@ -446,6 +446,8 @@ int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -446,6 +446,8 @@ int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
ctx->ctrlConnInfo = qwMsg->connInfo;
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_NOT_START)); QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_NOT_START));
_return: _return:
...@@ -475,8 +477,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex ...@@ -475,8 +477,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
atomic_store_8(&ctx->taskType, taskType); atomic_store_8(&ctx->taskType, taskType);
atomic_store_8(&ctx->explain, explain); atomic_store_8(&ctx->explain, explain);
ctx->ctrlConnInfo = qwMsg->connInfo;
/*QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);*/ /*QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);*/
code = qStringToSubplan(qwMsg->msg, &plan); code = qStringToSubplan(qwMsg->msg, &plan);
......
...@@ -774,13 +774,20 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { ...@@ -774,13 +774,20 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
strcpy(epId.ep.fqdn, pEp->fqdn); strcpy(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port; epId.ep.port = pEp->port;
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId)); SSchHbTrans *hb = NULL;
while (true) {
hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
if (NULL == hb) { if (NULL == hb) {
bool exist = false; bool exist = false;
SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist)); SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist));
if (!exist) { if (!exist) {
SCH_ERR_RET(schBuildAndSendHbMsg(&epId, NULL)); SCH_ERR_RET(schBuildAndSendHbMsg(&epId, NULL));
} }
continue;
}
break;
} }
atomic_add_fetch_64(&hb->taskNum, 1); atomic_add_fetch_64(&hb->taskNum, 1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册