From 14209eeec6ed1c40307240798713b3b80c814668 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 27 Jun 2022 19:34:50 +0800 Subject: [PATCH] feat: add no retry to query --- include/libs/transport/trpc.h | 2 +- source/client/src/clientEnv.c | 5 ++++- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 5 ++++- source/libs/function/src/udfd.c | 7 +++++-- source/libs/transport/inc/transportInt.h | 2 +- source/libs/transport/src/trans.c | 5 ----- source/libs/transport/src/transCli.c | 2 +- 7 files changed, 16 insertions(+), 12 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index c2c1a3534d..2b8c6a895e 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -69,7 +69,7 @@ typedef struct SRpcMsg { } SRpcMsg; typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf); -typedef bool (*RpcRfp)(int32_t code); +typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); typedef struct SRpcInit { char localFqdn[TSDB_FQDN_LEN]; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index d7bf4b60f1..8e0556125a 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -84,9 +84,12 @@ void closeTransporter(STscObj *pTscObj) { rpcClose(pTscObj->pAppInfo->pTransporter); } -static bool clientRpcRfp(int32_t code) { +static bool clientRpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { + if (msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH) { + return false; + } return true; } else { return false; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index a4745abd5b..7e31cc3144 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -248,9 +248,12 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { } } -static bool rpcRfp(int32_t code) { +static bool rpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { + if (msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH) { + return false; + } return true; } else { return false; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 983cffe9dc..364ee0692f 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -110,7 +110,7 @@ static void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf); static int32_t udfdConnectToMnode(); static int32_t udfdLoadUdf(char *udfName, SUdf *udf); -static bool udfdRpcRfp(int32_t code); +static bool udfdRpcRfp(int32_t code, tmsg_t msgType); static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int32_t udfdOpenClientRpc(); static int32_t udfdCloseClientRpc(); @@ -546,9 +546,12 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { } return 0; } -static bool udfdRpcRfp(int32_t code) { +static bool udfdRpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { + if (msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH) { + return false; + } return true; } else { return false; diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index c328629c4b..462debb247 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -52,7 +52,7 @@ typedef struct { char user[TSDB_UNI_LEN]; // meter ID void (*cfp)(void* parent, SRpcMsg*, SEpSet*); - bool (*retry)(int32_t code); + bool (*retry)(int32_t code, tmsg_t msgType); int index; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 4f7b19b539..cc2e95cfb3 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -164,11 +164,6 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { transSetDefaultAddr(thandle, ip, fqdn); } -// void rpcSetMsgTraceId(SRpcMsg* pMsg, STraceId uid) { -// SRpcHandleInfo* pInfo = &pMsg->info; -// pInfo->traceId = uid; -//} - int32_t rpcInit() { // impl later return 0; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7374d1fffc..ab08ce82a8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1030,7 +1030,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { */ STransConnCtx* pCtx = pMsg->ctx; int32_t code = pResp->code; - if (pTransInst->retry != NULL && pTransInst->retry(code)) { + if (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) { pMsg->sent = 0; pCtx->retryCnt += 1; if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { -- GitLab