提交 53efe751 编写于 作者: D dapan1121

feat: query redirect

上级 8dfe07e1
...@@ -239,7 +239,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ...@@ -239,7 +239,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_REDIRECT_ERROR(_code) \ #define NEED_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \ ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
(_code) == TSDB_CODE_NODE_NOT_DEPLOYED || (_code) == TSDB_CODE_SYN_NOT_LEADER || \ (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || (_code) == TSDB_CODE_SYN_NOT_LEADER || \
(_code) == TSDB_CODE_APP_NOT_READY) (_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK)
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
...@@ -249,8 +249,9 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ...@@ -249,8 +249,9 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \ ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \
(_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_APP_NOT_READY) (_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_APP_NOT_READY)
#define NEED_SCHEDULER_RETRY_ERROR(_code) \ #define NEED_SCHEDULER_RETRY_ERROR(_code) \
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) (NEED_SCHEDULER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
(_code) == TSDB_CODE_SCH_TIMEOUT_ERROR || (_code) == TSDB_CODE_RPC_BROKEN_LINK)
......
...@@ -250,7 +250,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { ...@@ -250,7 +250,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
static bool rpcRfp(int32_t code, tmsg_t msgType) { static bool rpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
return false; return false;
} }
......
...@@ -548,7 +548,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { ...@@ -548,7 +548,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
} }
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) { static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
return false; return false;
} }
......
...@@ -205,7 +205,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { ...@@ -205,7 +205,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
.msgType = TDMT_SCH_DROP_TASK, .msgType = TDMT_SCH_DROP_TASK,
.pCont = req, .pCont = req,
.contLen = sizeof(STaskDropReq), .contLen = sizeof(STaskDropReq),
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL, .code = TSDB_CODE_RPC_BROKEN_LINK,
.info = *pConn, .info = *pConn,
}; };
...@@ -239,7 +239,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo * ...@@ -239,7 +239,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *
.msgType = TDMT_SCH_QUERY_HEARTBEAT, .msgType = TDMT_SCH_QUERY_HEARTBEAT,
.pCont = msg, .pCont = msg,
.contLen = msgSize, .contLen = msgSize,
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL, .code = TSDB_CODE_RPC_BROKEN_LINK,
.info = *pConn, .info = *pConn,
}; };
...@@ -484,7 +484,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 ...@@ -484,7 +484,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info}; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) { if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code)); QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
} }
...@@ -522,7 +522,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ ...@@ -522,7 +522,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
uint64_t sId = req.sId; uint64_t sId = req.sId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info}; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) { if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code)); QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));
} }
......
...@@ -306,6 +306,8 @@ extern SSchedulerMgmt schMgmt; ...@@ -306,6 +306,8 @@ extern SSchedulerMgmt schMgmt;
#define SCH_IS_WAIT_ALL_JOB(_job) (!SCH_IS_QUERY_JOB(_job)) #define SCH_IS_WAIT_ALL_JOB(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job)) #define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) #define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (((_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_BROKEN_LINK) && ((_len) > 0))
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) #define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse]) #define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
......
...@@ -384,7 +384,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { ...@@ -384,7 +384,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
goto _return; goto _return;
} }
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL); bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode));
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, pParam->execIdx)); SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, pParam->execIdx));
int8_t status = 0; int8_t status = 0;
...@@ -396,7 +396,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { ...@@ -396,7 +396,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType)); SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType));
if (NEED_SCHEDULER_REDIRECT_ERROR(rspCode) || ((rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL) && pMsg->len > 0)) { if (NEED_SCHEDULER_REDIRECT_ERROR(rspCode) || SCH_SUB_TASK_NETWORK_ERR(rspCode, pMsg->len > 0)) {
code = schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode); code = schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode);
goto _return; goto _return;
} }
......
...@@ -328,7 +328,7 @@ void cliHandleResp(SCliConn* conn) { ...@@ -328,7 +328,7 @@ void cliHandleResp(SCliConn* conn) {
tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn, tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn,
transMsg.info.ahandle, TMSG_INFO(transMsg.msgType)); transMsg.info.ahandle, TMSG_INFO(transMsg.msgType));
if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) { if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; transMsg.code = TSDB_CODE_RPC_BROKEN_LINK;
transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn, tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn,
transMsg.info.ahandle); transMsg.info.ahandle);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册