提交 a3bf5b96 编写于 作者: D dapan1121

fix: fix query redirect issue

上级 5e18e07a
...@@ -208,6 +208,7 @@ enum { ...@@ -208,6 +208,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_CONTINUE, "query-continue", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_CONTINUE, "query-continue", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_FETCH, "fetch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_FETCH, "fetch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_MERGE_FETCH, "merge-fetch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_CANCEL_TASK, "cancel-task", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_CANCEL_TASK, "cancel-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "drop-task", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "drop-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL)
......
...@@ -351,6 +351,7 @@ typedef struct SDownstreamSourceNode { ...@@ -351,6 +351,7 @@ typedef struct SDownstreamSourceNode {
uint64_t taskId; uint64_t taskId;
uint64_t schedId; uint64_t schedId;
int32_t execId; int32_t execId;
int32_t fetchMsgType;
} SDownstreamSourceNode; } SDownstreamSourceNode;
typedef struct SExchangePhysiNode { typedef struct SExchangePhysiNode {
......
...@@ -102,7 +102,7 @@ void closeTransporter(SAppInstInfo *pAppInfo) { ...@@ -102,7 +102,7 @@ void closeTransporter(SAppInstInfo *pAppInfo) {
static bool clientRpcRfp(int32_t code, tmsg_t msgType) { static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
if (NEED_REDIRECT_ERROR(code)) { if (NEED_REDIRECT_ERROR(code)) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
return false; return false;
} }
return true; return true;
......
...@@ -214,6 +214,7 @@ SArray *mmGetMsgHandles() { ...@@ -214,6 +214,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -111,6 +111,7 @@ SArray *qmGetMsgHandles() { ...@@ -111,6 +111,7 @@ SArray *qmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
......
...@@ -328,6 +328,7 @@ SArray *vmGetMsgHandles() { ...@@ -328,6 +328,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
......
...@@ -42,7 +42,7 @@ static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -42,7 +42,7 @@ static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) { static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
pMsg->info.hasEpSet = 1; pMsg->info.hasEpSet = 1;
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType};
int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet); int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);
rsp.pCont = rpcMallocCont(contLen); rsp.pCont = rpcMallocCont(contLen);
...@@ -88,6 +88,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -88,6 +88,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
case TDMT_MND_SYSTABLE_RETRIEVE_RSP: case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
case TDMT_DND_SYSTABLE_RETRIEVE_RSP: case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
case TDMT_SCH_FETCH_RSP: case TDMT_SCH_FETCH_RSP:
case TDMT_SCH_MERGE_FETCH_RSP:
qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0); qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0);
return; return;
case TDMT_MND_STATUS_RSP: case TDMT_MND_STATUS_RSP:
...@@ -253,7 +254,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { ...@@ -253,7 +254,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
static bool rpcRfp(int32_t code, tmsg_t msgType) { static bool rpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) { code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
return false; return false;
} }
return true; return true;
......
...@@ -531,7 +531,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { ...@@ -531,7 +531,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0; if (!IsReq(pMsg)) return 0;
if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY || if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT || pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) { pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH ||
pMsg->msgType == TDMT_SCH_DROP_TASK) {
return 0; return 0;
} }
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
......
...@@ -45,6 +45,7 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { ...@@ -45,6 +45,7 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg, 0); code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg, 0);
break; break;
case TDMT_SCH_FETCH: case TDMT_SCH_FETCH:
case TDMT_SCH_MERGE_FETCH:
code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg, 0); code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg, 0);
break; break;
case TDMT_SCH_DROP_TASK: case TDMT_SCH_DROP_TASK:
...@@ -72,6 +73,7 @@ int32_t mndInitQuery(SMnode *pMnode) { ...@@ -72,6 +73,7 @@ int32_t mndInitQuery(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_QUERY, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_QUERY, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_FETCH, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg);
......
...@@ -86,6 +86,7 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) { ...@@ -86,6 +86,7 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg, ts); code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg, ts);
break; break;
case TDMT_SCH_FETCH: case TDMT_SCH_FETCH:
case TDMT_SCH_MERGE_FETCH:
code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts); code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts);
break; break;
case TDMT_SCH_FETCH_RSP: case TDMT_SCH_FETCH_RSP:
......
...@@ -225,7 +225,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -225,7 +225,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing"); vTrace("message in vnode query queue is processing");
if (!vnodeIsLeader(pVnode)) { if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg); vnodeRedirectRpcMsg(pVnode, pMsg);
return 0; return 0;
} }
...@@ -245,7 +245,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -245,7 +245,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("message in fetch queue is processing"); vTrace("message in fetch queue is processing");
if (!vnodeIsLeader(pVnode)) { if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG)
&& !vnodeIsLeader(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg); vnodeRedirectRpcMsg(pVnode, pMsg);
return 0; return 0;
} }
...@@ -255,6 +256,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -255,6 +256,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_SCH_FETCH: case TDMT_SCH_FETCH:
case TDMT_SCH_MERGE_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0); return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_SCH_FETCH_RSP: case TDMT_SCH_FETCH_RSP:
return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0); return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0);
......
...@@ -133,7 +133,7 @@ void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -133,7 +133,7 @@ void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
} }
pMsg->info.hasEpSet = 1; pMsg->info.hasEpSet = 1;
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType + 1};
tmsgSendRedirectRsp(&rsp, &newEpSet); tmsgSendRedirectRsp(&rsp, &newEpSet);
} }
......
...@@ -2053,7 +2053,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf ...@@ -2053,7 +2053,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
pMsgSendInfo->param = pWrapper; pMsgSendInfo->param = pWrapper;
pMsgSendInfo->msgInfo.pData = pMsg; pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgType = TDMT_SCH_FETCH; pMsgSendInfo->msgType = pSource->fetchMsgType;
pMsgSendInfo->fp = loadRemoteDataCallback; pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0; int64_t transporterId = 0;
......
...@@ -549,7 +549,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { ...@@ -549,7 +549,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) { static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) { code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
return false; return false;
} }
return true; return true;
......
...@@ -593,6 +593,7 @@ static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstre ...@@ -593,6 +593,7 @@ static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstre
COPY_SCALAR_FIELD(taskId); COPY_SCALAR_FIELD(taskId);
COPY_SCALAR_FIELD(schedId); COPY_SCALAR_FIELD(schedId);
COPY_SCALAR_FIELD(execId); COPY_SCALAR_FIELD(execId);
COPY_SCALAR_FIELD(fetchMsgType);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -3524,6 +3524,7 @@ static const char* jkDownstreamSourceAddr = "Addr"; ...@@ -3524,6 +3524,7 @@ static const char* jkDownstreamSourceAddr = "Addr";
static const char* jkDownstreamSourceTaskId = "TaskId"; static const char* jkDownstreamSourceTaskId = "TaskId";
static const char* jkDownstreamSourceSchedId = "SchedId"; static const char* jkDownstreamSourceSchedId = "SchedId";
static const char* jkDownstreamSourceExecId = "ExecId"; static const char* jkDownstreamSourceExecId = "ExecId";
static const char* jkDownstreamSourceFetchMsgType = "FetchMsgType";
static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) { static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) {
const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj; const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj;
...@@ -3538,6 +3539,9 @@ static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) { ...@@ -3538,6 +3539,9 @@ static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceExecId, pNode->execId); code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceExecId, pNode->execId);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceFetchMsgType, pNode->fetchMsgType);
}
return code; return code;
} }
...@@ -3555,6 +3559,9 @@ static int32_t jsonToDownstreamSourceNode(const SJson* pJson, void* pObj) { ...@@ -3555,6 +3559,9 @@ static int32_t jsonToDownstreamSourceNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkDownstreamSourceExecId, &pNode->execId); code = tjsonGetIntValue(pJson, jkDownstreamSourceExecId, &pNode->execId);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkDownstreamSourceFetchMsgType, &pNode->fetchMsgType);
}
return code; return code;
} }
......
...@@ -123,6 +123,7 @@ typedef struct SQWTaskCtx { ...@@ -123,6 +123,7 @@ typedef struct SQWTaskCtx {
int8_t taskType; int8_t taskType;
int8_t explain; int8_t explain;
int32_t queryType; int32_t queryType;
int32_t fetchType;
int32_t execId; int32_t execId;
bool queryFetched; bool queryFetched;
......
...@@ -35,8 +35,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes ...@@ -35,8 +35,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code);
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code);
int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code);
int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo); int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo);
......
...@@ -104,7 +104,7 @@ int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int ...@@ -104,7 +104,7 @@ int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
if (NULL == pRsp) { if (NULL == pRsp) {
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRsp, 0, sizeof(SRetrieveTableRsp)); memset(pRsp, 0, sizeof(SRetrieveTableRsp));
...@@ -112,7 +112,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i ...@@ -112,7 +112,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i
} }
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_SCH_FETCH_RSP, .msgType = rspType,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp) + dataLength, .contLen = sizeof(*pRsp) + dataLength,
.code = code, .code = code,
...@@ -436,7 +436,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int ...@@ -436,7 +436,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
int64_t rId = 0; int64_t rId = 0;
int32_t eId = msg->execId; int32_t eId = msg->execId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info}; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType};
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle); QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
......
...@@ -617,7 +617,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -617,7 +617,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
qwMsg->connInfo = ctx->dataConnInfo; qwMsg->connInfo = ctx->dataConnInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code); qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
rsp = NULL; rsp = NULL;
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
...@@ -639,7 +639,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -639,7 +639,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
rsp = NULL; rsp = NULL;
qwMsg->connInfo = ctx->dataConnInfo; qwMsg->connInfo = ctx->dataConnInfo;
qwBuildAndSendFetchRsp(&qwMsg->connInfo, NULL, 0, code); qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, NULL, 0, code);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
0); 0);
} }
...@@ -672,6 +672,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -672,6 +672,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
ctx->queryType = qwMsg->msgType;
SOutputData sOutput = {0}; SOutputData sOutput = {0};
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
...@@ -722,7 +724,7 @@ _return: ...@@ -722,7 +724,7 @@ _return:
} }
if (code || rsp) { if (code || rsp) {
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code); qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
dataLen); dataLen);
} }
......
...@@ -214,7 +214,8 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { ...@@ -214,7 +214,8 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
rpcFreeCont(rsp); rpcFreeCont(rsp);
break; break;
} }
case TDMT_SCH_FETCH_RSP: { case TDMT_SCH_FETCH_RSP:
case TDMT_SCH_MERGE_FETCH_RSP: {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont; SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont;
if (0 == pRsp->code && 0 == rsp->completed) { if (0 == pRsp->code && 0 == rsp->completed) {
...@@ -815,6 +816,7 @@ void *fetchQueueThread(void *param) { ...@@ -815,6 +816,7 @@ void *fetchQueueThread(void *param) {
switch (fetchRpc->msgType) { switch (fetchRpc->msgType) {
case TDMT_SCH_FETCH: case TDMT_SCH_FETCH:
case TDMT_SCH_MERGE_FETCH:
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0); qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0);
break; break;
case TDMT_SCH_CANCEL_TASK: case TDMT_SCH_CANCEL_TASK:
......
...@@ -35,7 +35,7 @@ extern "C" { ...@@ -35,7 +35,7 @@ extern "C" {
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000 #define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000 #define SCH_MAX_TASK_TIMEOUT_USEC 60000000
#define SCH_TASK_MAX_EXEC_TIMES 5 #define SCH_TASK_MAX_EXEC_TIMES 8
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA #define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
enum { enum {
...@@ -318,6 +318,7 @@ extern SSchedulerMgmt schMgmt; ...@@ -318,6 +318,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true #define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) #define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_SRC_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0) #define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
...@@ -327,7 +328,7 @@ extern SSchedulerMgmt schMgmt; ...@@ -327,7 +328,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) #define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) #define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0)) #define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0))
#define SCH_NEED_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH) #define SCH_NEED_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
#define SCH_NEED_REDIRECT(_msgType, _code, _rspLen) (SCH_NEED_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_SUB_TASK_NETWORK_ERR(_code, _rspLen))) #define SCH_NEED_REDIRECT(_msgType, _code, _rspLen) (SCH_NEED_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_SUB_TASK_NETWORK_ERR(_code, _rspLen)))
#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_NEED_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) #define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_NEED_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
......
...@@ -45,6 +45,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy ...@@ -45,6 +45,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
//SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); //SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
case TDMT_SCH_FETCH_RSP: case TDMT_SCH_FETCH_RSP:
case TDMT_SCH_MERGE_FETCH_RSP:
if (lastMsgType != reqMsgType && -1 != lastMsgType) { if (lastMsgType != reqMsgType && -1 != lastMsgType) {
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
TMSG_INFO(msgType)); TMSG_INFO(msgType));
...@@ -299,7 +300,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa ...@@ -299,7 +300,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
} }
break; break;
} }
case TDMT_SCH_FETCH_RSP: { case TDMT_SCH_FETCH_RSP:
case TDMT_SCH_MERGE_FETCH_RSP: {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
SCH_ERR_JRET(rspCode); SCH_ERR_JRET(rspCode);
...@@ -550,6 +552,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { ...@@ -550,6 +552,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
case TDMT_VND_DELETE: case TDMT_VND_DELETE:
case TDMT_SCH_EXPLAIN: case TDMT_SCH_EXPLAIN:
case TDMT_SCH_FETCH: case TDMT_SCH_FETCH:
case TDMT_SCH_MERGE_FETCH:
*fp = schHandleCallback; *fp = schHandleCallback;
break; break;
case TDMT_SCH_DROP_TASK: case TDMT_SCH_DROP_TASK:
...@@ -1007,7 +1010,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1007,7 +1010,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
persistHandle = true; persistHandle = true;
break; break;
} }
case TDMT_SCH_FETCH: { case TDMT_SCH_FETCH:
case TDMT_SCH_MERGE_FETCH: {
msgSize = sizeof(SResFetchReq); msgSize = sizeof(SResFetchReq);
msg = taosMemoryCalloc(1, msgSize); msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) { if (NULL == msg) {
......
...@@ -258,7 +258,9 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { ...@@ -258,7 +258,9 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
.taskId = pTask->taskId, .taskId = pTask->taskId,
.schedId = schMgmt.sId, .schedId = schMgmt.sId,
.execId = pTask->execId, .execId = pTask->execId,
.addr = pTask->succeedAddr}; .addr = pTask->succeedAddr,
.fetchMsgType = SCH_FETCH_TYPE(pTask),
};
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source); qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
SCH_UNLOCK(SCH_WRITE, &parent->lock); SCH_UNLOCK(SCH_WRITE, &parent->lock);
...@@ -818,7 +820,7 @@ int32_t schLaunchFetchTask(SSchJob *pJob) { ...@@ -818,7 +820,7 @@ int32_t schLaunchFetchTask(SSchJob *pJob) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_SCH_FETCH)); SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask)));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册