提交 4a55ed07 编写于 作者: D dapan1121

feature: query redirect

上级 7da915fa
......@@ -167,10 +167,6 @@ enum {
TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY, "query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "query-continue", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH, "fetch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TABLE, "create-table", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TABLE, "alter-table", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TABLE, "drop-table", NULL, NULL)
......@@ -184,12 +180,9 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TASK, "vnode-drop-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
......@@ -206,6 +199,15 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY, "query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_CONTINUE, "query-continue", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_FETCH, "fetch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_CANCEL_TASK, "vnode-cancel-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "vnode-drop-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "vnode-explain", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL)
......
......@@ -91,7 +91,7 @@ void closeTransporter(SAppInstInfo *pAppInfo) {
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
if (msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_FETCH) {
return false;
}
return true;
......
......@@ -808,7 +808,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
break;
}
case TDMT_VND_QUERY: {
case TDMT_SCH_QUERY: {
code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
break;
}
......
......@@ -209,10 +209,10 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......@@ -220,7 +220,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -107,14 +107,14 @@ SArray *qmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MON_QM_INFO, qmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
// Requests handled by VNODE
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:
......
......@@ -324,16 +324,16 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MON_VM_LOAD, vmPutMsgToMonitorQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......@@ -349,7 +349,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -251,7 +251,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
static bool rpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
if (msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_FETCH) {
return false;
}
return true;
......
......@@ -527,9 +527,9 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0;
if (pMsg->msgType == TDMT_VND_QUERY || pMsg->msgType == TDMT_VND_QUERY_CONTINUE ||
pMsg->msgType == TDMT_VND_QUERY_HEARTBEAT || pMsg->msgType == TDMT_VND_FETCH ||
pMsg->msgType == TDMT_VND_DROP_TASK) {
if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_QUERY_CONTINUE ||
pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT || pMsg->msgType == TDMT_SCH_FETCH ||
pMsg->msgType == TDMT_SCH_DROP_TASK) {
return 0;
}
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
......
......@@ -19,13 +19,13 @@
#include "qworker.h"
int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg) {
if (TDMT_VND_QUERY != pMsg->msgType) return 0;
if (TDMT_SCH_QUERY != pMsg->msgType) return 0;
SMnode *pMnode = pMsg->info.node;
return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg);
}
void mndPostProcessQueryMsg(SRpcMsg *pMsg) {
if (TDMT_VND_QUERY != pMsg->msgType) return;
if (TDMT_SCH_QUERY != pMsg->msgType) return;
SMnode *pMnode = pMsg->info.node;
qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg);
}
......@@ -37,19 +37,19 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
mTrace("msg:%p, in query queue is processing", pMsg);
switch (pMsg->msgType) {
case TDMT_VND_QUERY:
case TDMT_SCH_QUERY:
code = qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pMsg, 0);
break;
case TDMT_VND_QUERY_CONTINUE:
case TDMT_SCH_QUERY_CONTINUE:
code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg, 0);
break;
case TDMT_VND_FETCH:
case TDMT_SCH_FETCH:
code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg, 0);
break;
case TDMT_VND_DROP_TASK:
case TDMT_SCH_DROP_TASK:
code = qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pMsg, 0);
break;
case TDMT_VND_QUERY_HEARTBEAT:
case TDMT_SCH_QUERY_HEARTBEAT:
code = qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg, 0);
break;
default:
......@@ -67,11 +67,11 @@ int32_t mndInitQuery(SMnode *pMnode) {
return -1;
}
mndSetMsgHandle(pMnode, TDMT_VND_QUERY, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_VND_QUERY_CONTINUE, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_VND_FETCH, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TASK, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_VND_QUERY_HEARTBEAT, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg);
return 0;
}
......
......@@ -65,7 +65,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) {
}
int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg * pMsg) {
if (TDMT_VND_QUERY != pMsg->msgType) {
if (TDMT_SCH_QUERY != pMsg->msgType) {
return 0;
}
......@@ -78,28 +78,28 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
qTrace("message in qnode queue is processing");
switch (pMsg->msgType) {
case TDMT_VND_QUERY:
case TDMT_SCH_QUERY:
code = qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_QUERY_CONTINUE:
case TDMT_SCH_QUERY_CONTINUE:
code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_FETCH:
case TDMT_SCH_FETCH:
code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_FETCH_RSP:
code = qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_CANCEL_TASK:
case TDMT_SCH_CANCEL_TASK:
code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_DROP_TASK:
case TDMT_SCH_DROP_TASK:
code = qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_CONSUME:
// code = tqProcessConsumeReq(pQnode->pTq, pMsg);
// break;
case TDMT_VND_QUERY_HEARTBEAT:
case TDMT_SCH_QUERY_HEARTBEAT:
code = qWorkerProcessHbMsg(pQnode, pQnode->pQuery, pMsg, ts);
break;
default:
......
......@@ -216,7 +216,7 @@ _err:
}
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
if (TDMT_VND_QUERY != pMsg->msgType) {
if (TDMT_SCH_QUERY != pMsg->msgType) {
return 0;
}
......@@ -227,9 +227,9 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing");
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
switch (pMsg->msgType) {
case TDMT_VND_QUERY:
case TDMT_SCH_QUERY:
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
case TDMT_VND_QUERY_CONTINUE:
case TDMT_SCH_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
default:
vError("unknown msg type:%d in query queue", pMsg->msgType);
......@@ -243,15 +243,15 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
switch (pMsg->msgType) {
case TDMT_VND_FETCH:
case TDMT_SCH_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_FETCH_RSP:
return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_CANCEL_TASK:
case TDMT_SCH_CANCEL_TASK:
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_DROP_TASK:
case TDMT_SCH_DROP_TASK:
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_QUERY_HEARTBEAT:
case TDMT_SCH_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg);
......
......@@ -2035,7 +2035,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
pMsgSendInfo->param = pWrapper;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgType = TDMT_VND_FETCH;
pMsgSendInfo->msgType = TDMT_SCH_FETCH;
pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0;
......
......@@ -549,7 +549,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
if (msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_FETCH) {
return false;
}
return true;
......
......@@ -5965,7 +5965,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_EXPLAIN_STMT:
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->haveResultSet = true;
pQuery->msgType = TDMT_VND_QUERY;
pQuery->msgType = TDMT_SCH_QUERY;
break;
case QUERY_NODE_DELETE_STMT:
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
......
......@@ -1555,7 +1555,7 @@ static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogic
if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) {
code = buildVnodeModifySubplan(pCxt, pLogicSubplan, pSubplan);
} else {
pSubplan->msgType = TDMT_VND_QUERY;
pSubplan->msgType = TDMT_SCH_QUERY;
code = createPhysiNode(pCxt, pLogicSubplan->pNode, pSubplan, &pSubplan->pNode);
if (TSDB_CODE_SUCCESS == code && !pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) {
code = createDataDispatcher(pCxt, pSubplan->pNode, &pSubplan->pDataSink);
......
......@@ -216,7 +216,7 @@ void destroyQueryExecRes(SQueryExecRes* pRes) {
tFreeSSubmitRsp((SSubmitRsp*)pRes->res);
break;
}
case TDMT_VND_QUERY: {
case TDMT_SCH_QUERY: {
taosArrayDestroy((SArray*)pRes->res);
break;
}
......
......@@ -169,7 +169,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
req->taskId = tId;
SRpcMsg pNewMsg = {
.msgType = TDMT_VND_QUERY_CONTINUE,
.msgType = TDMT_SCH_QUERY_CONTINUE,
.pCont = req,
.contLen = sizeof(SQueryContinueReq),
.code = 0,
......@@ -202,7 +202,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
req->refId = htobe64(rId);
SRpcMsg brokenMsg = {
.msgType = TDMT_VND_DROP_TASK,
.msgType = TDMT_SCH_DROP_TASK,
.pCont = req,
.contLen = sizeof(STaskDropReq),
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
......@@ -236,7 +236,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *
}
SRpcMsg brokenMsg = {
.msgType = TDMT_VND_QUERY_HEARTBEAT,
.msgType = TDMT_SCH_QUERY_HEARTBEAT,
.pCont = msg,
.contLen = msgSize,
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
......
......@@ -122,7 +122,7 @@ void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
qwtqueryMsg.taskId = htobe64(1);
qwtqueryMsg.phyLen = htonl(100);
qwtqueryMsg.sqlLen = 0;
queryRpc->msgType = TDMT_VND_QUERY;
queryRpc->msgType = TDMT_SCH_QUERY;
queryRpc->pCont = &qwtqueryMsg;
queryRpc->contLen = sizeof(SSubQueryMsg) + 100;
}
......@@ -131,7 +131,7 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
fetchMsg->sId = htobe64(1);
fetchMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
fetchMsg->taskId = htobe64(1);
fetchRpc->msgType = TDMT_VND_FETCH;
fetchRpc->msgType = TDMT_SCH_FETCH;
fetchRpc->pCont = fetchMsg;
fetchRpc->contLen = sizeof(SResFetchReq);
}
......@@ -140,7 +140,7 @@ void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
dropMsg->sId = htobe64(1);
dropMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
dropMsg->taskId = htobe64(1);
dropRpc->msgType = TDMT_VND_DROP_TASK;
dropRpc->msgType = TDMT_SCH_DROP_TASK;
dropRpc->pCont = dropMsg;
dropRpc->contLen = sizeof(STaskDropReq);
}
......@@ -756,9 +756,9 @@ void *queryQueueThread(void *param) {
}
}
if (TDMT_VND_QUERY == queryRpc->msgType) {
if (TDMT_SCH_QUERY == queryRpc->msgType) {
qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0);
} else if (TDMT_VND_QUERY_CONTINUE == queryRpc->msgType) {
} else if (TDMT_SCH_QUERY_CONTINUE == queryRpc->msgType) {
qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0);
} else {
printf("unknown msg in query queue, type:%d\n", queryRpc->msgType);
......@@ -813,13 +813,13 @@ void *fetchQueueThread(void *param) {
}
switch (fetchRpc->msgType) {
case TDMT_VND_FETCH:
case TDMT_SCH_FETCH:
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0);
break;
case TDMT_VND_CANCEL_TASK:
case TDMT_SCH_CANCEL_TASK:
qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0);
break;
case TDMT_VND_DROP_TASK:
case TDMT_SCH_DROP_TASK:
qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0);
break;
default:
......
......@@ -1183,7 +1183,7 @@ int32_t schFetchFromRemote(SSchJob *pJob) {
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH));
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_SCH_FETCH));
return TSDB_CODE_SUCCESS;
......@@ -1222,7 +1222,7 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
while (nodeInfo) {
SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK);
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK);
nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
}
......@@ -1307,7 +1307,7 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
tbInfo.tversion = rsp->tversion;
taosArrayPush((SArray *)pJob->execRes.res, &tbInfo);
pJob->execRes.msgType = TDMT_VND_QUERY;
pJob->execRes.msgType = TDMT_SCH_QUERY;
}
return TSDB_CODE_SUCCESS;
......
......@@ -568,22 +568,22 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
case TDMT_VND_SUBMIT:
*fp = schHandleSubmitCallback;
break;
case TDMT_VND_QUERY:
case TDMT_SCH_QUERY:
*fp = schHandleQueryCallback;
break;
case TDMT_VND_DELETE:
*fp = schHandleDeleteCallback;
break;
case TDMT_VND_EXPLAIN:
case TDMT_SCH_EXPLAIN:
*fp = schHandleExplainCallback;
break;
case TDMT_VND_FETCH:
case TDMT_SCH_FETCH:
*fp = schHandleFetchCallback;
break;
case TDMT_VND_DROP_TASK:
case TDMT_SCH_DROP_TASK:
*fp = schHandleDropCallback;
break;
case TDMT_VND_QUERY_HEARTBEAT:
case TDMT_SCH_QUERY_HEARTBEAT:
*fp = schHandleHbCallback;
break;
case TDMT_SCH_LINK_BROKEN:
......@@ -694,7 +694,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
int32_t msgType = TDMT_VND_QUERY_HEARTBEAT_RSP;
__async_send_cb_fn_t fp = NULL;
SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp));
SCH_ERR_JRET(schGetCallbackFp(TDMT_SCH_QUERY_HEARTBEAT, &fp));
param->nodeEpId = epId;
param->pTrans = pJob->conn.pTrans;
......@@ -784,7 +784,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
}
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, NULL, 0, TDMT_VND_EXPLAIN, &trans, false, &pExplainMsgSendInfo));
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, NULL, 0, TDMT_SCH_EXPLAIN, &trans, false, &pExplainMsgSendInfo));
int32_t msgType = TDMT_VND_EXPLAIN_RSP;
SRpcCtxVal ctxVal = {.val = pExplainMsgSendInfo, .clone = schCloneSMsgSendInfo};
......@@ -882,7 +882,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
SEpSet *epSet = &addr->epSet;
SMsgSendInfo *pMsgSendInfo = NULL;
bool isHb = (TDMT_VND_QUERY_HEARTBEAT == msgType);
bool isHb = (TDMT_SCH_QUERY_HEARTBEAT == msgType);
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, msg, msgSize, msgType, trans, isHb, &pMsgSendInfo));
SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask));
......@@ -926,7 +926,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction) {
int32_t code = 0;
SRpcCtx rpcCtx = {0};
SSchTrans trans = {0};
int32_t msgType = TDMT_VND_QUERY_HEARTBEAT;
int32_t msgType = TDMT_SCH_QUERY_HEARTBEAT;
req.header.vgId = nodeEpId->nodeId;
req.sId = schMgmt.sId;
......@@ -1032,7 +1032,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
tSerializeSVDeleteReq(msg, msgSize, &req);
break;
}
case TDMT_VND_QUERY: {
case TDMT_SCH_QUERY: {
SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
uint32_t len = strlen(pJob->sql);
......@@ -1060,7 +1060,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
persistHandle = true;
break;
}
case TDMT_VND_FETCH: {
case TDMT_SCH_FETCH: {
msgSize = sizeof(SResFetchReq);
msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
......@@ -1078,7 +1078,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
break;
}
case TDMT_VND_DROP_TASK: {
case TDMT_SCH_DROP_TASK: {
msgSize = sizeof(STaskDropReq);
msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
......@@ -1096,7 +1096,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg->refId = htobe64(pJob->refId);
break;
}
case TDMT_VND_QUERY_HEARTBEAT: {
case TDMT_SCH_QUERY_HEARTBEAT: {
SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx));
SSchedulerHbReq req = {0};
......@@ -1135,7 +1135,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle,
(rpcCtx.args ? &rpcCtx : NULL)));
if (msgType == TDMT_VND_QUERY) {
if (msgType == TDMT_SCH_QUERY) {
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execIdx));
}
......
......@@ -118,7 +118,7 @@ void schtBuildQueryDag(SQueryPlan *dag) {
scanPlan->level = 1;
scanPlan->pParents = nodesMakeList();
scanPlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
scanPlan->msgType = TDMT_VND_QUERY;
scanPlan->msgType = TDMT_SCH_QUERY;
mergePlan->id.queryId = qId;
mergePlan->id.groupId = schtMergeTemplateId;
......@@ -130,7 +130,7 @@ void schtBuildQueryDag(SQueryPlan *dag) {
mergePlan->pChildren = nodesMakeList();
mergePlan->pParents = NULL;
mergePlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
mergePlan->msgType = TDMT_VND_QUERY;
mergePlan->msgType = TDMT_SCH_QUERY;
merge->pNodeList = nodesMakeList();
scan->pNodeList = nodesMakeList();
......@@ -181,7 +181,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
scanPlan[i].level = 1;
scanPlan[i].pParents = nodesMakeList();
scanPlan[i].pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
scanPlan[i].msgType = TDMT_VND_QUERY;
scanPlan[i].msgType = TDMT_SCH_QUERY;
nodesListAppend(scanPlan[i].pParents, (SNode*)mergePlan);
nodesListAppend(mergePlan->pChildren, (SNode*)(scanPlan + i));
......@@ -198,7 +198,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
mergePlan->pParents = NULL;
mergePlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
mergePlan->msgType = TDMT_VND_QUERY;
mergePlan->msgType = TDMT_SCH_QUERY;
nodesListAppend(merge->pNodeList, (SNode*)mergePlan);
......@@ -896,7 +896,7 @@ TEST(queryTest, flowCtrlCase) {
taosHashCancelIterate(pJob->execTasks, pIter);
if (task->lastMsgType == TDMT_VND_QUERY) {
if (task->lastMsgType == TDMT_SCH_QUERY) {
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册