未验证 提交 92a21fd7 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18651 from taosdata/enh/TD-20774

enh: remove TSDB_CODE_RPC_REDIRECT
......@@ -43,7 +43,6 @@ typedef int32_t (*PutToQueueFp)(void* pMgmt, EQueueType qtype, SRpcMsg* pMsg);
typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg);
typedef void (*SendRspFp)(SRpcMsg* pMsg);
typedef void (*SendRedirectRspFp)(SRpcMsg* pMsg, const SEpSet* pNewEpSet);
typedef void (*RegisterBrokenLinkArgFp)(SRpcMsg* pMsg);
typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type);
typedef void (*ReportStartup)(const char* name, const char* desc);
......@@ -55,7 +54,6 @@ typedef struct {
GetQueueSizeFp qsizeFp;
SendReqFp sendReqFp;
SendRspFp sendRspFp;
SendRedirectRspFp sendRedirectRspFp;
RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
ReleaseHandleFp releaseHandleFp;
ReportStartup reportStartupFp;
......@@ -66,7 +64,6 @@ int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg);
int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg);
void tmsgSendRsp(SRpcMsg* pMsg);
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet);
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg);
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type);
void tmsgReportStartup(const char* name, const char* desc);
......
......@@ -261,24 +261,22 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR || (_code) == TSDB_CODE_VND_STOPPED)
#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_RESTORING || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) (false) // used later
#define NEED_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
(_code) == TSDB_CODE_MNODE_NOT_FOUND || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \
SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || \
(_code) == TSDB_CODE_SYN_RESTORING || (_code) == TSDB_CODE_RPC_BROKEN_LINK || \
(_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING)
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
(_type) == TDMT_MND_DROP_STB)
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_MNODE_NOT_FOUND || \
SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || \
SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_SYN_RESTORING || \
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_MNODE_NOT_FOUND || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \
SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || \
(_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING)
#define REQUEST_TOTAL_EXEC_TIMES 2
......
......@@ -40,61 +40,61 @@ int32_t* taosGetErrno();
#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error
// rpc
// #define TSDB_CODE_RPC_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001) //2.x
// #define TSDB_CODE_RPC_AUTH_REQUIRED TAOS_DEF_ERROR_CODE(0, 0x0002) //2.x
#define TSDB_CODE_RPC_AUTH_FAILURE TAOS_DEF_ERROR_CODE(0, 0x0003)
#define TSDB_CODE_RPC_REDIRECT TAOS_DEF_ERROR_CODE(0, 0x0004)
// #define TSDB_CODE_RPC_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0005) //2.x
// #define TSDB_CODE_RPC_ALREADY_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0006) //2.x
// #define TSDB_CODE_RPC_LAST_SESSION_NOT_FINI. TAOS_DEF_ERROR_CODE(0, 0x0007) //2.x
// #define TSDB_CODE_RPC_MISMATCHED_LINK_ID TAOS_DEF_ERROR_CODE(0, 0x0008) //2.x
// #define TSDB_CODE_RPC_TOO_SLOW TAOS_DEF_ERROR_CODE(0, 0x0009) //2.x
// #define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x000A) //2.x
// #define TSDB_CODE_RPC_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001) // 2.x
// #define TSDB_CODE_RPC_AUTH_REQUIRED TAOS_DEF_ERROR_CODE(0, 0x0002) // 2.x
// #define TSDB_CODE_RPC_AUTH_FAILURE TAOS_DEF_ERROR_CODE(0, 0x0003) // 2.x
// #define TSDB_CODE_RPC_REDIRECT TAOS_DEF_ERROR_CODE(0, 0x0004) // 2.x
// #define TSDB_CODE_RPC_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0005) // 2.x
// #define TSDB_CODE_RPC_ALREADY_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0006) // 2.x
// #define TSDB_CODE_RPC_LAST_SESSION_NOT_FINI. TAOS_DEF_ERROR_CODE(0, 0x0007) // 2.x
// #define TSDB_CODE_RPC_MISMATCHED_LINK_ID TAOS_DEF_ERROR_CODE(0, 0x0008) // 2.x
// #define TSDB_CODE_RPC_TOO_SLOW TAOS_DEF_ERROR_CODE(0, 0x0009) // 2.x
// #define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x000A) // 2.x
#define TSDB_CODE_RPC_NETWORK_UNAVAIL TAOS_DEF_ERROR_CODE(0, 0x000B)
// #define TSDB_CODE_RPC_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x000C) //2.x
// #define TSDB_CODE_RPC_UNEXPECTED_RESPONSE TAOS_DEF_ERROR_CODE(0, 0x000D) //2.x
// #define TSDB_CODE_RPC_INVALID_VALUE TAOS_DEF_ERROR_CODE(0, 0x000E) //2.x
// #define TSDB_CODE_RPC_INVALID_TRAN_ID TAOS_DEF_ERROR_CODE(0, 0x000F) //2.x
// #define TSDB_CODE_RPC_INVALID_SESSION_ID TAOS_DEF_ERROR_CODE(0, 0x0010) //2.x
// #define TSDB_CODE_RPC_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0011) //2.x
// #define TSDB_CODE_RPC_INVALID_RESPONSE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0012) //2.x
#define TSDB_CODE_TIME_UNSYNCED TAOS_DEF_ERROR_CODE(0, 0x0013)
// #define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0014) //2.x
// #define TSDB_CODE_RPC_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x000C) // 2.x
// #define TSDB_CODE_RPC_UNEXPECTED_RESPONSE TAOS_DEF_ERROR_CODE(0, 0x000D) // 2.x
// #define TSDB_CODE_RPC_INVALID_VALUE TAOS_DEF_ERROR_CODE(0, 0x000E) // 2.x
// #define TSDB_CODE_RPC_INVALID_TRAN_ID TAOS_DEF_ERROR_CODE(0, 0x000F) // 2.x
// #define TSDB_CODE_RPC_INVALID_SESSION_ID TAOS_DEF_ERROR_CODE(0, 0x0010) // 2.x
// #define TSDB_CODE_RPC_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0011) // 2.x
// #define TSDB_CODE_RPC_INVALID_RESPONSE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0012) // 2.x
#define TSDB_CODE_TIME_UNSYNCED TAOS_DEF_ERROR_CODE(0, 0x0013) //
// #define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0014) // 2.x
#define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015)
// #define TSDB_CODE_RPC_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0016) //2.x
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017)
#define TSDB_CODE_RPC_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0018)
#define TSDB_CODE_RPC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0019)
// #define TSDB_CODE_RPC_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0016) // 2.x
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017) //
#define TSDB_CODE_RPC_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0018) //
#define TSDB_CODE_RPC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0019) //
//common & util
#define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100)
#define TSDB_CODE_MEMORY_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0101)
#define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) //
// #define TSDB_CODE_MEMORY_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0101) // 2.x
#define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0102)
// #define TSDB_CODE_COM_INVALID_CFG_MSG TAOS_DEF_ERROR_CODE(0, 0x0103) // 2.x
#define TSDB_CODE_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0104)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0105)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0106)
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0107)
#define TSDB_CODE_REF_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x0108)
#define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0109)
#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x010A)
#define TSDB_CODE_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0110)
#define TSDB_CODE_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0111)
#define TSDB_CODE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0112)
#define TSDB_CODE_OUT_OF_SHM_MEM TAOS_DEF_ERROR_CODE(0, 0x0113)
#define TSDB_CODE_INVALID_SHM_ID TAOS_DEF_ERROR_CODE(0, 0x0114)
#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0115)
#define TSDB_CODE_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0104) //
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0105) // internal
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0106) // internal
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0107) // internal
#define TSDB_CODE_REF_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x0108) // internal
#define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0109) // internal
#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x010A) // internal
#define TSDB_CODE_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0110) //
#define TSDB_CODE_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0111) // internal
#define TSDB_CODE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0112) //
// #define TSDB_CODE_OUT_OF_SHM_MEM TAOS_DEF_ERROR_CODE(0, 0x0113)
// #define TSDB_CODE_INVALID_SHM_ID TAOS_DEF_ERROR_CODE(0, 0x0114)
#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0115) //
#define TSDB_CODE_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0116) //
#define TSDB_CODE_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x0117)
#define TSDB_CODE_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x0118)
#define TSDB_CODE_INVALID_CFG TAOS_DEF_ERROR_CODE(0, 0x0119)
#define TSDB_CODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x011A)
#define TSDB_CODE_INVALID_JSON_FORMAT TAOS_DEF_ERROR_CODE(0, 0x011B)
#define TSDB_CODE_INVALID_VERSION_NUMBER TAOS_DEF_ERROR_CODE(0, 0x011C)
#define TSDB_CODE_INVALID_VERSION_STRING TAOS_DEF_ERROR_CODE(0, 0x011D)
#define TSDB_CODE_VERSION_NOT_COMPATIBLE TAOS_DEF_ERROR_CODE(0, 0x011E)
#define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x011F)
#define TSDB_CODE_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x0117) // internal
#define TSDB_CODE_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x0118) //
#define TSDB_CODE_INVALID_CFG TAOS_DEF_ERROR_CODE(0, 0x0119) // internal
#define TSDB_CODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x011A) // internal
#define TSDB_CODE_INVALID_JSON_FORMAT TAOS_DEF_ERROR_CODE(0, 0x011B) // internal
#define TSDB_CODE_INVALID_VERSION_NUMBER TAOS_DEF_ERROR_CODE(0, 0x011C) // internal
#define TSDB_CODE_INVALID_VERSION_STRING TAOS_DEF_ERROR_CODE(0, 0x011D) // internal
#define TSDB_CODE_VERSION_NOT_COMPATIBLE TAOS_DEF_ERROR_CODE(0, 0x011E) // internal
#define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x011F) // internal
#define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0120)
#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0121) //
......
......@@ -146,7 +146,6 @@ void taos_close(TAOS *taos) {
int taos_errno(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res)) {
if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_QRY_NOT_READY;
return terrno;
}
......@@ -154,12 +153,11 @@ int taos_errno(TAOS_RES *res) {
return 0;
}
return ((SRequestObj *)res)->code == TSDB_CODE_RPC_REDIRECT ? TSDB_CODE_QRY_NOT_READY : ((SRequestObj *)res)->code;
return ((SRequestObj *)res)->code;
}
const char *taos_errstr(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res)) {
if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_QRY_NOT_READY;
return (const char *)tstrerror(terrno);
}
......@@ -171,8 +169,7 @@ const char *taos_errstr(TAOS_RES *res) {
if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) {
return pRequest->msgBuf;
} else {
return pRequest->code == TSDB_CODE_RPC_REDIRECT ? (const char *)tstrerror(TSDB_CODE_QRY_NOT_READY)
: (const char *)tstrerror(pRequest->code);
return (const char *)tstrerror(pRequest->code);
}
}
......
......@@ -61,7 +61,7 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
pMsg->info.rsp = NULL;
}
if (code == TSDB_CODE_RPC_REDIRECT) {
if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
mndPostProcessQueryMsg(pMsg);
}
......
......@@ -33,23 +33,6 @@ static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
}
}
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
pMsg->info.hasEpSet = 1;
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType};
int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);
rsp.pCont = rpcMallocCont(contLen);
if (rsp.pCont == NULL) {
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
} else {
tSerializeSEpSet(rsp.pCont, contLen, pNewEpSet);
rsp.contLen = contLen;
}
dmSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId;
......@@ -243,9 +226,9 @@ static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { rpcRegisterBrokenLin
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcReleaseHandle(pHandle, type); }
static bool rpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_MNODE_NOT_FOUND ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_RPC_BROKEN_LINK ||
code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED ||
code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
msgType == TDMT_SCH_MERGE_FETCH) {
return false;
......@@ -334,7 +317,6 @@ SMsgCb dmGetMsgcb(SDnode *pDnode) {
.clientRpc = pDnode->trans.clientRpc,
.sendReqFp = dmSendReq,
.sendRspFp = dmSendRsp,
.sendRedirectRspFp = dmSendRedirectRsp,
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
.releaseHandleFp = dmReleaseHandle,
.reportStartupFp = dmReportStartup,
......
......@@ -220,7 +220,7 @@ static int32_t mndProcessCreateAcctReq(SRpcMsg *pReq) {
return -1;
}
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
mError("failed to process create acct request since %s", terrstr());
return -1;
}
......@@ -230,7 +230,7 @@ static int32_t mndProcessAlterAcctReq(SRpcMsg *pReq) {
return -1;
}
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
mError("failed to process create acct request since %s", terrstr());
return -1;
}
......@@ -240,7 +240,7 @@ static int32_t mndProcessDropAcctReq(SRpcMsg *pReq) {
return -1;
}
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
mError("failed to process create acct request since %s", terrstr());
return -1;
}
\ No newline at end of file
......@@ -945,7 +945,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
code = TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL;
}
if (i != 0 && code == 0) {
code = TSDB_CODE_RPC_REDIRECT;
code = TSDB_CODE_MNODE_NOT_FOUND;
}
mInfo("trans:%d, client:%d send rsp, code:0x%x stage:%s app:%p", pTrans->id, i, code, mndTransStr(pTrans->stage),
pInfo->ahandle);
......@@ -1042,8 +1042,8 @@ static void mndTransResetAction(SMnode *pMnode, STrans *pTrans, STransAction *pA
pAction->rawWritten = 0;
pAction->msgSent = 0;
pAction->msgReceived = 0;
if (pAction->errCode == TSDB_CODE_RPC_REDIRECT || pAction->errCode == TSDB_CODE_SYN_NEW_CONFIG_ERROR ||
pAction->errCode == TSDB_CODE_SYN_INTERNAL_ERROR || pAction->errCode == TSDB_CODE_SYN_NOT_LEADER) {
if (pAction->errCode == TSDB_CODE_SYN_NEW_CONFIG_ERROR || pAction->errCode == TSDB_CODE_SYN_INTERNAL_ERROR ||
pAction->errCode == TSDB_CODE_SYN_NOT_LEADER) {
pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps;
mInfo("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage),
pAction->id, pAction->epSet.inUse);
......
......@@ -32,7 +32,7 @@ TEST_F(MndTestAcct, 01_Create_Acct) {
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_ACCT, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MSG_NOT_PROCESSED);
ASSERT_EQ(pRsp->code, TSDB_CODE_OPS_NOT_SUPPORT);
}
TEST_F(MndTestAcct, 02_Alter_Acct) {
......@@ -42,7 +42,7 @@ TEST_F(MndTestAcct, 02_Alter_Acct) {
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_ACCT, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MSG_NOT_PROCESSED);
ASSERT_EQ(pRsp->code, TSDB_CODE_OPS_NOT_SUPPORT);
}
TEST_F(MndTestAcct, 03_Drop_Acct) {
......@@ -52,5 +52,5 @@ TEST_F(MndTestAcct, 03_Drop_Acct) {
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_ACCT, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MSG_NOT_PROCESSED);
ASSERT_EQ(pRsp->code, TSDB_CODE_OPS_NOT_SUPPORT);
}
......@@ -100,7 +100,7 @@ int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
int32_t vnodeSyncStart(SVnode* pVnode);
void vnodeSyncPreClose(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg, int32_t code);
bool vnodeIsLeader(SVnode* pVnode);
bool vnodeIsRoleLeader(SVnode* pVnode);
......
......@@ -344,7 +344,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing");
// if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
if ((pMsg->msgType == TDMT_SCH_QUERY) && !syncIsReadyForRead(pVnode->sync)) {
vnodeRedirectRpcMsg(pVnode, pMsg);
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0;
}
......@@ -367,12 +367,12 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
pMsg->msgType == TDMT_VND_BATCH_META) &&
!syncIsReadyForRead(pVnode->sync)) {
// !vnodeIsLeader(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg);
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0;
}
if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) {
vnodeRedirectRpcMsg(pVnode, pMsg);
vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
return 0;
}
......
......@@ -53,7 +53,7 @@ static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
}
}
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
SEpSet newEpSet = {0};
syncGetRetryEpSet(pVnode->sync, &newEpSet);
......@@ -66,8 +66,20 @@ void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
}
pMsg->info.hasEpSet = 1;
SRpcMsg rsp = {.code = TSDB_CODE_SYN_NOT_LEADER, .info = pMsg->info, .msgType = pMsg->msgType + 1};
tmsgSendRedirectRsp(&rsp, &newEpSet);
if (code == 0) code = TSDB_CODE_SYN_NOT_LEADER;
SRpcMsg rsp = {.code = code, .info = pMsg->info, .msgType = pMsg->msgType + 1};
int32_t contLen = tSerializeSEpSet(NULL, 0, &newEpSet);
rsp.pCont = rpcMallocCont(contLen);
if (rsp.pCont == NULL) {
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
} else {
tSerializeSEpSet(rsp.pCont, contLen, &newEpSet);
rsp.contLen = contLen;
}
tmsgSendRsp(&rsp);
}
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
......@@ -88,7 +100,7 @@ static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
vnodeRedirectRpcMsg(pVnode, pMsg);
vnodeRedirectRpcMsg(pVnode, pMsg, code);
} else {
const STraceId *trace = &pMsg->info.traceId;
vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
......
......@@ -598,9 +598,10 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
return 0;
}
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND|| code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER ||
code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
code == TSDB_CODE_APP_IS_STOPPING) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
return false;
}
return true;
......
......@@ -214,20 +214,20 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) {
epSet.eps[2].port = 7300;
ctx->phase = QW_PHASE_POST_QUERY;
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet);
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet);
*rsped = true;
return;
}
if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY);
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL);
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL);
*rsped = true;
return;
}
if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) {
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL);
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL);
*rsped = true;
return;
}
......
......@@ -54,8 +54,6 @@ void tmsgSendRsp(SRpcMsg* pMsg) {
#endif
}
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet) { (*defaultMsgCb.sendRedirectRspFp)(pMsg, pNewEpSet); }
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) { (*defaultMsgCb.registerBrokenLinkArgFp)(pMsg); }
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) { (*defaultMsgCb.releaseHandleFp)(pHandle, type); }
......
......@@ -1519,7 +1519,9 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
transFreeMsg(pResp->pCont);
transUnrefCliHandle(pConn);
} else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_VND_STOPPED) {
code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED ||
code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
code == TSDB_CODE_APP_IS_STOPPING) {
tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, true);
transFreeMsg(pResp->pCont);
......
......@@ -46,8 +46,6 @@ STaosError errors[] = {
#endif
// rpc
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_AUTH_FAILURE, "Authentication failure")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_REDIRECT, "Database not ready, need retry")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_UNAVAIL, "Unable to establish connection")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use")
......@@ -57,7 +55,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout")
//common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")
TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported")
TAOS_DEFINE_ERROR(TSDB_CODE_MEMORY_CORRUPTED, "Memory corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_MEMORY, "Out of Memory")
TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, "Data file corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
......@@ -70,8 +67,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_ERROR, "Unexpected generic error")
TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_IN_PROGRESS, "Action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RANGE, "Out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_SHM_MEM, "Out of Shared memory")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_SHM_ID, "Invalid SHM ID")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, "Invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG_LEN, "Invalid message len")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_PTR, "Invalid pointer")
......@@ -320,7 +315,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SNODE_ALREADY_DEPLOYED, "Snode already deploye
TAOS_DEFINE_ERROR(TSDB_CODE_SNODE_NOT_DEPLOYED, "Snode not deployed")
// vnode
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Invalid Vgroup ID")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode moved to another dnode or was deleted")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_EXIST, "Vnode not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ALREADY_EXIST, "Vnode already exist")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册