未验证 提交 62e2054e 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18642 from taosdata/enh/TD-20891

refact: remove TSDB_CODE_APP_NOT_READY and TSDB_CODE_NODE_NOT_DEPLOYED
......@@ -264,21 +264,22 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) (false) // used later
#define NEED_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
(_code) == TSDB_CODE_NODE_NOT_DEPLOYED || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \
SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || \
(_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK || \
#define NEED_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
(_code) == TSDB_CODE_MNODE_NOT_FOUND || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \
SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || \
(_code) == TSDB_CODE_SYN_RESTORING || (_code) == TSDB_CODE_RPC_BROKEN_LINK || \
(_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING)
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
(_type) == TDMT_MND_DROP_STB)
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_MNODE_NOT_FOUND || \
SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || \
SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_APP_NOT_READY)
SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_SYN_RESTORING || \
(_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING)
#define REQUEST_TOTAL_EXEC_TIMES 2
......
......@@ -59,7 +59,7 @@ int32_t* taosGetErrno();
// #define TSDB_CODE_RPC_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0011) //2.x
// #define TSDB_CODE_RPC_INVALID_RESPONSE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0012) //2.x
#define TSDB_CODE_TIME_UNSYNCED TAOS_DEF_ERROR_CODE(0, 0x0013)
#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0014)
// #define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0014) //2.x
#define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015)
// #define TSDB_CODE_RPC_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0016) //2.x
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017)
......@@ -298,6 +298,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TRANS_CONFLICT TAOS_DEF_ERROR_CODE(0, 0x03D3)
#define TSDB_CODE_MND_TRANS_CLOG_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x03D4)
#define TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL TAOS_DEF_ERROR_CODE(0, 0x03D5)
#define TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED TAOS_DEF_ERROR_CODE(0, 0x03D6) //internal
#define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03DF)
// mnode-mq
......@@ -330,13 +331,49 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_SMA_OPTION TAOS_DEF_ERROR_CODE(0, 0x0482)
// dnode
#define TSDB_CODE_NODE_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0408)
#define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0409)
#define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x040A)
// #define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) // 2.x
// #define TSDB_CODE_DND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0401) // 2.x
// #define TSDB_CODE_DND_NO_WRITE_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0402) // 2.x
// #define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0403) // 2.x
// #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0404) // 2.x
// #define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0405) // 2.x
// #define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0406) // 2.x
// #define TSDB_CODE_DND_VNODE_OPEN_FAILED TAOS_DEF_ERROR_CODE(0, 0x0407) // 2.x
#define TSDB_CODE_DNODE_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0408)
#define TSDB_CODE_MNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0409)
#define TSDB_CODE_MNODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x040A)
#define TSDB_CODE_MNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x040B)
#define TSDB_CODE_QNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x040C)
#define TSDB_CODE_QNODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x040D)
#define TSDB_CODE_QNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x040E)
#define TSDB_CODE_SNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x040F)
#define TSDB_CODE_SNODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0410)
#define TSDB_CODE_SNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0411)
// vnode
// #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) // 2.x
// #define TSDB_CODE_VND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0501) // 2.x
// #define TSDB_CODE_VND_ACTION_NEED_REPROCESS. TAOS_DEF_ERROR_CODE(0, 0x0502) // 2.x
#define TSDB_CODE_VND_INVALID_VGROUP_ID TAOS_DEF_ERROR_CODE(0, 0x0503)
// #define TSDB_CODE_VND_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0504) // 2.x
// #define TSDB_CODE_VND_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0505) // 2.x
// #define TSDB_CODE_VND_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0506) // 2.x
// #define TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR TAOS_DEF_ERROR_CODE(0, 0x0507) // 2.x
// #define TSDB_CODE_VND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0508) // 2.x
// #define TSDB_CODE_VND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0509) // 2.x
// #define TSDB_CODE_VND_INVALID_VRESION_FILE TAOS_DEF_ERROR_CODE(0, 0x050A) // 2.x
// #define TSDB_CODE_VND_IS_FULL TAOS_DEF_ERROR_CODE(0, 0x050B) // 2.x
// #define TSDB_CODE_VND_IS_FLOWCTRL TAOS_DEF_ERROR_CODE(0, 0x050C) // 2.x
// #define TSDB_CODE_VND_IS_DROPPING TAOS_DEF_ERROR_CODE(0, 0x050D) // 2.x
// #define TSDB_CODE_VND_IS_BALANCING TAOS_DEF_ERROR_CODE(0, 0x050E) // 2.x
// #define TSDB_CODE_VND_IS_CLOSING TAOS_DEF_ERROR_CODE(0, 0x0510) // 2.x
// #define TSDB_CODE_VND_NOT_SYNCED TAOS_DEF_ERROR_CODE(0, 0x0511) // 2.x
#define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512)
// #define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) // 2.x
// #define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) // 2.x
// #define TSDB_CODE_WAIT_THREAD_TOO_MANY TAOS_DEF_ERROR_CODE(0, 0x0515) // 2.x
#define TSDB_CODE_VND_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0520) // internal
#define TSDB_CODE_VND_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0521) // internal
#define TSDB_CODE_VND_HASH_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x0522)
#define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0524)
#define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0525)
......
......@@ -216,7 +216,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dDebug("vgId:%d, already exist", req.vgId);
tFreeSCreateVnodeReq(&req);
vmReleaseVnode(pMgmt, pVnode);
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
terrno = TSDB_CODE_VND_ALREADY_EXIST;
code = terrno;
return 0;
}
......@@ -307,7 +307,7 @@ int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
terrno = TSDB_CODE_VND_NOT_EXIST;
return -1;
}
......@@ -369,7 +369,7 @@ int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
if (pVnode == NULL) {
dDebug("vgId:%d, failed to drop since %s", vgId, terrstr());
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
terrno = TSDB_CODE_VND_NOT_EXIST;
return -1;
}
......
......@@ -152,7 +152,19 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
if (pWrapper != NULL) {
dmReleaseWrapper(pWrapper);
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
switch (ntype) {
case MNODE:
terrno = TSDB_CODE_MNODE_ALREADY_DEPLOYED;
break;
case QNODE:
terrno = TSDB_CODE_QNODE_ALREADY_DEPLOYED;
break;
case SNODE:
terrno = TSDB_CODE_SNODE_ALREADY_DEPLOYED;
break;
default:
terrno = TSDB_CODE_APP_ERROR;
}
dError("failed to create node since %s", terrstr());
return -1;
}
......@@ -191,7 +203,20 @@ static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
if (pWrapper == NULL) {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
switch (ntype) {
case MNODE:
terrno = TSDB_CODE_MNODE_NOT_DEPLOYED;
break;
case QNODE:
terrno = TSDB_CODE_QNODE_NOT_DEPLOYED;
break;
case SNODE:
terrno = TSDB_CODE_SNODE_NOT_DEPLOYED;
break;
default:
terrno = TSDB_CODE_APP_ERROR;
}
dError("failed to drop node since %s", terrstr());
return -1;
}
......
......@@ -189,7 +189,6 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
// dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount);
} else {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
pRetWrapper = NULL;
}
taosThreadRwlockUnlock(&pWrapper->lock);
......@@ -205,7 +204,20 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
// dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount);
} else {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
switch (pWrapper->ntype) {
case MNODE:
terrno = TSDB_CODE_MNODE_NOT_FOUND;
break;
case QNODE:
terrno = TSDB_CODE_QNODE_NOT_FOUND;
break;
case SNODE:
terrno = TSDB_CODE_SNODE_NOT_FOUND;
break;
default:
terrno = TSDB_CODE_APP_IS_STOPPING;
break;
}
code = -1;
}
taosThreadRwlockUnlock(&pWrapper->lock);
......
......@@ -172,8 +172,7 @@ _OVER:
if (IsReq(pRpc)) {
SRpcMsg rsp = {.code = code, .info = pRpc->info};
if ((code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_APP_NOT_READY) && pRpc->msgType > TDMT_MND_MSG &&
pRpc->msgType < TDMT_VND_MSG) {
if (code == TSDB_CODE_MNODE_NOT_FOUND) {
dmBuildMnodeRedirectRsp(pDnode, &rsp);
}
......@@ -226,7 +225,11 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
terrno = TSDB_CODE_NODE_OFFLINE;
if (pDnode->status == DND_STAT_INIT) {
terrno = TSDB_CODE_APP_IS_STARTING;
} else {
terrno = TSDB_CODE_APP_IS_STOPPING;
}
dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), terrstr(), pMsg->info.handle);
return -1;
} else {
......@@ -240,9 +243,9 @@ static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { rpcRegisterBrokenLin
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcReleaseHandle(pHandle, type); }
static bool rpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK ||
code == TSDB_CODE_VND_STOPPED) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_MNODE_NOT_FOUND ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_RPC_BROKEN_LINK ||
code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
msgType == TDMT_SCH_MERGE_FETCH) {
return false;
......
......@@ -40,7 +40,7 @@ TEST_F(DndTestMnode, 01_Create_Mnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_ALREADY_DEPLOYED);
ASSERT_EQ(pRsp->code, TSDB_CODE_MNODE_ALREADY_DEPLOYED);
}
{
......@@ -57,7 +57,7 @@ TEST_F(DndTestMnode, 01_Create_Mnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_ALREADY_DEPLOYED);
ASSERT_EQ(pRsp->code, TSDB_CODE_MNODE_ALREADY_DEPLOYED);
}
{
......@@ -77,7 +77,7 @@ TEST_F(DndTestMnode, 01_Create_Mnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_ALREADY_DEPLOYED);
ASSERT_EQ(pRsp->code, TSDB_CODE_MNODE_ALREADY_DEPLOYED);
}
}
......@@ -171,7 +171,7 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_NOT_DEPLOYED);
ASSERT_EQ(pRsp->code, TSDB_CODE_MNODE_NOT_DEPLOYED);
}
{
......@@ -188,7 +188,7 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_NOT_DEPLOYED);
ASSERT_EQ(pRsp->code, TSDB_CODE_MNODE_NOT_DEPLOYED);
}
{
......
......@@ -62,7 +62,7 @@ TEST_F(DndTestQnode, 01_Create_Qnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_QNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_ALREADY_DEPLOYED);
ASSERT_EQ(pRsp->code, TSDB_CODE_QNODE_ALREADY_DEPLOYED);
}
test.Restart();
......@@ -77,7 +77,7 @@ TEST_F(DndTestQnode, 01_Create_Qnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_QNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_ALREADY_DEPLOYED);
ASSERT_EQ(pRsp->code, TSDB_CODE_QNODE_ALREADY_DEPLOYED);
}
}
......@@ -120,7 +120,7 @@ TEST_F(DndTestQnode, 02_Drop_Qnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_QNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_NOT_DEPLOYED);
ASSERT_EQ(pRsp->code, TSDB_CODE_QNODE_NOT_DEPLOYED);
}
test.Restart();
......@@ -135,7 +135,7 @@ TEST_F(DndTestQnode, 02_Drop_Qnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_QNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_NOT_DEPLOYED);
ASSERT_EQ(pRsp->code, TSDB_CODE_QNODE_NOT_DEPLOYED);
}
{
......
......@@ -62,7 +62,7 @@ TEST_F(DndTestSnode, 01_Create_Snode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_SNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_ALREADY_DEPLOYED);
ASSERT_EQ(pRsp->code, TSDB_CODE_SNODE_ALREADY_DEPLOYED);
}
test.Restart();
......@@ -77,7 +77,7 @@ TEST_F(DndTestSnode, 01_Create_Snode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_SNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_ALREADY_DEPLOYED);
ASSERT_EQ(pRsp->code, TSDB_CODE_SNODE_ALREADY_DEPLOYED);
}
}
......@@ -120,7 +120,7 @@ TEST_F(DndTestSnode, 01_Drop_Snode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_SNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_NOT_DEPLOYED);
ASSERT_EQ(pRsp->code, TSDB_CODE_SNODE_NOT_DEPLOYED);
}
test.Restart();
......@@ -135,7 +135,7 @@ TEST_F(DndTestSnode, 01_Drop_Snode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_SNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_NOT_DEPLOYED);
ASSERT_EQ(pRsp->code, TSDB_CODE_SNODE_NOT_DEPLOYED);
}
{
......
......@@ -63,7 +63,7 @@ TEST_F(DndTestVnode, 01_Create_Vnode) {
ASSERT_EQ(pRsp->code, 0);
test.Restart();
} else {
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_ALREADY_DEPLOYED);
ASSERT_EQ(pRsp->code, TSDB_CODE_VND_ALREADY_EXIST);
}
}
}
......@@ -285,7 +285,7 @@ TEST_F(DndTestVnode, 06_Drop_Vnode) {
ASSERT_EQ(pRsp->code, 0);
test.Restart();
} else {
ASSERT_EQ(pRsp->code, TSDB_CODE_NODE_NOT_DEPLOYED);
ASSERT_EQ(pRsp->code, TSDB_CODE_VND_NOT_EXIST);
}
}
}
\ No newline at end of file
......@@ -787,7 +787,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
if ((numOfVnodes > 0 || pMObj != NULL || pSObj != NULL || pQObj != NULL) && !dropReq.force) {
if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
terrno = TSDB_CODE_NODE_OFFLINE;
terrno = TSDB_CODE_DNODE_OFFLINE;
mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(),
numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
goto _OVER;
......
......@@ -71,7 +71,7 @@ static int32_t mndInsInitMeta(SHashObj *hash) {
int32_t mndBuildInsTableSchema(SMnode *pMnode, const char *dbFName, const char *tbName, bool sysinfo,
STableMetaRsp *pRsp) {
if (NULL == pMnode->infosMeta) {
terrno = TSDB_CODE_APP_NOT_READY;
terrno = TSDB_CODE_APP_ERROR;
return -1;
}
......@@ -97,7 +97,7 @@ int32_t mndBuildInsTableSchema(SMnode *pMnode, const char *dbFName, const char *
int32_t mndBuildInsTableCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) {
if (NULL == pMnode->infosMeta) {
terrno = TSDB_CODE_APP_NOT_READY;
terrno = TSDB_CODE_APP_ERROR;
return -1;
}
......
......@@ -45,7 +45,7 @@ static inline int32_t mndAcquireRpc(SMnode *pMnode) {
int32_t code = 0;
taosThreadRwlockRdlock(&pMnode->lock);
if (pMnode->stopped) {
terrno = TSDB_CODE_APP_NOT_READY;
terrno = TSDB_CODE_APP_IS_STOPPING;
code = -1;
} else if (!mndIsLeader(pMnode)) {
code = -1;
......@@ -599,11 +599,45 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
return 0;
}
if (mndAcquireRpc(pMsg->info.node) == 0) return 0;
SMnode *pMnode = pMsg->info.node;
SMnode *pMnode = pMsg->info.node;
taosThreadRwlockRdlock(&pMnode->lock);
if (pMnode->stopped) {
taosThreadRwlockUnlock(&pMnode->lock);
terrno = TSDB_CODE_APP_IS_STOPPING;
return -1;
}
terrno = 0;
SSyncState state = syncGetState(pMnode->syncMgmt.sync);
if (terrno != 0) {
taosThreadRwlockUnlock(&pMnode->lock);
return -1;
}
if (state.state != TAOS_SYNC_STATE_LEADER) {
taosThreadRwlockUnlock(&pMnode->lock);
terrno = TSDB_CODE_SYN_NOT_LEADER;
goto _OVER;
}
if (!state.restored || !pMnode->restored) {
taosThreadRwlockUnlock(&pMnode->lock);
terrno = TSDB_CODE_SYN_RESTORING;
goto _OVER;
}
#if 1
atomic_add_fetch_32(&pMnode->rpcRef, 1);
#else
int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
mTrace("mnode rpc is acquired, ref:%d", ref);
#endif
taosThreadRwlockUnlock(&pMnode->lock);
return 0;
_OVER:
if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
......@@ -616,29 +650,24 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
SEpSet epSet = {0};
mndGetMnodeEpSet(pMnode, &epSet);
mDebug(
mGDebug(
"msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
"role:%s, redirect numOfEps:%d inUse:%d",
pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, state.restored,
syncStr(state.restored), epSet.numOfEps, epSet.inUse);
if (epSet.numOfEps > 0) {
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
mDebug("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
}
if (epSet.numOfEps <= 0) return -1;
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
pMsg->info.rsp = rpcMallocCont(contLen);
if (pMsg->info.rsp != NULL) {
tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
pMsg->info.hasEpSet = 1;
pMsg->info.rspLen = contLen;
terrno = TSDB_CODE_RPC_REDIRECT;
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
}
} else {
terrno = TSDB_CODE_APP_NOT_READY;
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
mDebug("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
}
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
pMsg->info.rsp = rpcMallocCont(contLen);
if (pMsg->info.rsp != NULL) {
tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
pMsg->info.hasEpSet = 1;
pMsg->info.rspLen = contLen;
}
return -1;
......
......@@ -292,7 +292,7 @@ static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *p
.pCont = pReq,
.contLen = contLen,
.msgType = TDMT_DND_CREATE_MNODE,
.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED,
.acceptableCode = TSDB_CODE_MNODE_ALREADY_DEPLOYED,
};
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
......@@ -333,7 +333,7 @@ static int32_t mndBuildDropMnodeRedoAction(STrans *pTrans, SDDropMnodeReq *pDrop
.pCont = pReq,
.contLen = contLen,
.msgType = TDMT_DND_DROP_MNODE,
.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED,
.acceptableCode = TSDB_CODE_MNODE_NOT_DEPLOYED,
};
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
......@@ -440,7 +440,7 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
}
if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
terrno = TSDB_CODE_NODE_OFFLINE;
terrno = TSDB_CODE_DNODE_OFFLINE;
goto _OVER;
}
......@@ -490,7 +490,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
if (totalMnodes == 2) {
if (force) {
mError("cant't force drop dnode, since a mnode on it and replica is 2");
terrno = TSDB_CODE_NODE_OFFLINE;
terrno = TSDB_CODE_DNODE_OFFLINE;
return -1;
}
mInfo("vgId:1, has %d mnodes, exec redo log first", totalMnodes);
......@@ -574,7 +574,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
}
if (!mndIsDnodeOnline(pObj->pDnode, taosGetTimestampMs())) {
terrno = TSDB_CODE_NODE_OFFLINE;
terrno = TSDB_CODE_DNODE_OFFLINE;
goto _OVER;
}
......
......@@ -68,7 +68,7 @@ int32_t mndPerfsInitMeta(SHashObj *hash) {
int32_t mndBuildPerfsTableSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
if (NULL == pMnode->perfsMeta) {
terrno = TSDB_CODE_APP_NOT_READY;
terrno = TSDB_CODE_APP_ERROR;
return -1;
}
......@@ -94,7 +94,7 @@ int32_t mndBuildPerfsTableSchema(SMnode *pMnode, const char *dbFName, const char
int32_t mndBuildPerfsTableCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) {
if (NULL == pMnode->perfsMeta) {
terrno = TSDB_CODE_APP_NOT_READY;
terrno = TSDB_CODE_APP_ERROR;
return -1;
}
......
......@@ -205,7 +205,7 @@ static int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_CREATE_QNODE;
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
action.acceptableCode = TSDB_CODE_QNODE_ALREADY_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
......@@ -232,7 +232,7 @@ static int32_t mndSetCreateQnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, S
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_DROP_QNODE;
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
action.acceptableCode = TSDB_CODE_QNODE_NOT_DEPLOYED;
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
......@@ -345,7 +345,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_DROP_QNODE;
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
action.acceptableCode = TSDB_CODE_QNODE_NOT_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
......
......@@ -463,7 +463,7 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_CREATE_VNODE;
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
......@@ -780,7 +780,7 @@ static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SD
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_DROP_VNODE;
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
......
......@@ -210,7 +210,7 @@ static int32_t mndSetCreateSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_CREATE_SNODE;
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
action.acceptableCode = TSDB_CODE_SNODE_ALREADY_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
......@@ -237,7 +237,7 @@ static int32_t mndSetCreateSnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, S
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_DROP_SNODE;
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
action.acceptableCode = TSDB_CODE_SNODE_NOT_DEPLOYED;
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
......@@ -352,7 +352,7 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_DROP_SNODE;
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
action.acceptableCode = TSDB_CODE_SNODE_NOT_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
......
......@@ -318,7 +318,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
if (pMgmt->transId != 0) {
mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
taosWUnLockLatch(&pMgmt->lock);
terrno = TSDB_CODE_APP_NOT_READY;
terrno = TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED;
return -1;
}
......@@ -339,13 +339,11 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
code = 0;
} else {
mInfo("trans:%d, failed to proposed since %s", transId, terrstr());
mError("trans:%d, failed to proposed since %s", transId, terrstr());
taosWLockLatch(&pMgmt->lock);
pMgmt->transId = 0;
taosWUnLockLatch(&pMgmt->lock);
if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
terrno = TSDB_CODE_APP_NOT_READY;
} else {
if (terrno == 0) {
terrno = TSDB_CODE_APP_ERROR;
}
}
......@@ -381,20 +379,23 @@ void mndSyncStop(SMnode *pMnode) {
}
bool mndIsLeader(SMnode *pMnode) {
terrno = 0;
SSyncState state = syncGetState(pMnode->syncMgmt.sync);
if (state.state != TAOS_SYNC_STATE_LEADER || !state.restored) {
if (state.state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
} else {
terrno = TSDB_CODE_APP_NOT_READY;
}
mDebug("vgId:1, mnode not ready, state:%s, restore:%d", syncStr(state.state), state.restored);
if (terrno != 0) {
mDebug("vgId:1, mnode is stopping");
return false;
}
if (state.state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
mDebug("vgId:1, mnode not leader, state:%s", syncStr(state.state));
return false;
}
if (!mndGetRestored(pMnode)) {
terrno = TSDB_CODE_APP_NOT_READY;
if (!state.restored || !pMnode->restored) {
terrno = TSDB_CODE_SYN_RESTORING;
mDebug("vgId:1, mnode not restored:%d:%d", state.restored, pMnode->restored);
return false;
}
......
......@@ -918,10 +918,13 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
sendRsp = true;
}
} else {
if (pTrans->stage == TRN_STAGE_REDO_ACTION && ((code == TSDB_CODE_APP_NOT_READY && pTrans->failedTimes > 60) ||
(code != TSDB_CODE_APP_NOT_READY && pTrans->failedTimes > 6))) {
if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_APP_IS_STARTING) {
if (pTrans->failedTimes > 60) sendRsp = true;
} else {
if (pTrans->failedTimes > 6) sendRsp = true;
}
if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
sendRsp = true;
}
}
......
......@@ -998,7 +998,7 @@ int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVg
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_CREATE_VNODE;
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
......@@ -1098,7 +1098,7 @@ int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgOb
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_DROP_VNODE;
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
action.acceptableCode = TSDB_CODE_VND_NOT_EXIST;
if (isRedo) {
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
......
......@@ -173,7 +173,7 @@ class MndTestTrans2 : public ::testing::Test {
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_CREATE_MNODE;
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
action.acceptableCode = TSDB_CODE_MNODE_ALREADY_DEPLOYED;
mndTransAppendRedoAction(pTrans, &action);
}
......@@ -190,7 +190,7 @@ class MndTestTrans2 : public ::testing::Test {
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_CREATE_MNODE;
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
action.acceptableCode = TSDB_CODE_MNODE_ALREADY_DEPLOYED;
mndTransAppendUndoAction(pTrans, &action);
}
......
......@@ -153,8 +153,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
if (!pVnode->restored) {
vGError("vgId:%d, msg:%p failed to process since restore not finished", vgId, pMsg);
terrno = TSDB_CODE_APP_NOT_READY;
vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_APP_NOT_READY);
terrno = TSDB_CODE_SYN_RESTORING;
vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
continue;
......@@ -544,21 +544,23 @@ bool vnodeIsRoleLeader(SVnode *pVnode) {
}
bool vnodeIsLeader(SVnode *pVnode) {
terrno = 0;
SSyncState state = syncGetState(pVnode->sync);
if (state.state != TAOS_SYNC_STATE_LEADER || !state.restored) {
if (state.state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
} else {
terrno = TSDB_CODE_APP_NOT_READY;
}
vInfo("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncStr(state.state), state.restored);
if (terrno != 0) {
vInfo("vgId:%d, vnode is stopping", pVnode->config.vgId);
return false;
}
if (state.state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
vInfo("vgId:%d, vnode not leader, state:%s", pVnode->config.vgId, syncStr(state.state));
return false;
}
if (!pVnode->restored) {
vInfo("vgId:%d, vnode not restored", pVnode->config.vgId);
terrno = TSDB_CODE_APP_NOT_READY;
if (!state.restored || !pVnode->restored) {
terrno = TSDB_CODE_SYN_RESTORING;
vInfo("vgId:%d, vnode not restored:%d:%d", pVnode->config.vgId, state.restored, pVnode->restored);
return false;
}
......
......@@ -598,8 +598,8 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
return 0;
}
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND|| code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
return false;
}
......
......@@ -401,62 +401,62 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("sync ready for read error");
return false;
}
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->restoreFinish) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
return false;
}
if (pSyncNode->restoreFinish) {
return true;
}
bool ready = false;
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && !pSyncNode->restoreFinish) {
if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
// apply queue not empty
ready = false;
if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
// apply queue not empty
ready = false;
} else {
if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
SSyncRaftEntry* pEntry = NULL;
SLRUCache* pCache = pSyncNode->pLogStore->pCache;
LRUHandle* h = taosLRUCacheLookup(pCache, &lastIndex, sizeof(lastIndex));
int32_t code = 0;
if (h) {
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
code = 0;
} else {
if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
SSyncRaftEntry* pEntry = NULL;
SLRUCache* pCache = pSyncNode->pLogStore->pCache;
LRUHandle* h = taosLRUCacheLookup(pCache, &lastIndex, sizeof(lastIndex));
int32_t code = 0;
if (h) {
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
code = 0;
pSyncNode->pLogStore->cacheHit++;
sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", lastIndex, pEntry->bytes, pEntry);
pSyncNode->pLogStore->cacheHit++;
sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", lastIndex, pEntry->bytes, pEntry);
} else {
pSyncNode->pLogStore->cacheMiss++;
sNTrace(pSyncNode, "miss cache index:%" PRId64, lastIndex);
} else {
pSyncNode->pLogStore->cacheMiss++;
sNTrace(pSyncNode, "miss cache index:%" PRId64, lastIndex);
code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, lastIndex, &pEntry);
}
code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, lastIndex, &pEntry);
if (code == 0 && pEntry != NULL) {
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
ready = true;
}
if (code == 0 && pEntry != NULL) {
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
ready = true;
}
if (h) {
taosLRUCacheRelease(pCache, h, false);
} else {
syncEntryDestroy(pEntry);
}
if (h) {
taosLRUCacheRelease(pCache, h, false);
} else {
syncEntryDestroy(pEntry);
}
}
}
}
if (!ready) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
} else {
terrno = TSDB_CODE_APP_NOT_READY;
}
terrno = TSDB_CODE_SYN_RESTORING;
}
return ready;
......@@ -549,7 +549,11 @@ SSyncState syncGetState(int64_t rid) {
if (pSyncNode != NULL) {
state.state = pSyncNode->state;
state.restored = pSyncNode->restoreFinish;
state.canRead = syncNodeIsReadyForRead(pSyncNode);
if (pSyncNode->vgId != 1) {
state.canRead = syncNodeIsReadyForRead(pSyncNode);
} else {
state.canRead = state.restored;
}
syncNodeRelease(pSyncNode);
}
......
......@@ -56,7 +56,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout")
//common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported")
TAOS_DEFINE_ERROR(TSDB_CODE_MEMORY_CORRUPTED, "Memory corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_MEMORY, "Out of Memory")
......@@ -277,6 +276,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CONFLICT, "Conflict transaction not completed")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CLOG_IS_NULL, "Transaction commitlog is null")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED, "Last Transaction not finished")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL, "Unable to establish connection While execute transaction")
// mnode-mq
......@@ -308,14 +308,23 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "SMA does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SMA_OPTION, "Invalid sma option")
// dnode
TAOS_DEFINE_ERROR(TSDB_CODE_NODE_OFFLINE, "Node is offline")
TAOS_DEFINE_ERROR(TSDB_CODE_NODE_ALREADY_DEPLOYED, "Node already deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_NODE_NOT_DEPLOYED, "Node not deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_OFFLINE, "Dnode is offline")
TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_NOT_FOUND, "Mnode not found")
TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_ALREADY_DEPLOYED, "Mnode already deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_NOT_DEPLOYED, "Mnode not deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_QNODE_NOT_FOUND, "Qnode not found")
TAOS_DEFINE_ERROR(TSDB_CODE_QNODE_ALREADY_DEPLOYED, "Qnode already deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_QNODE_NOT_DEPLOYED, "Qnode not deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_SNODE_NOT_FOUND, "Snode not found")
TAOS_DEFINE_ERROR(TSDB_CODE_SNODE_ALREADY_DEPLOYED, "Snode already deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_SNODE_NOT_DEPLOYED, "Snode not deployed")
// vnode
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Invalid Vgroup ID")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_HASH_MISMATCH, "Hash value mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_EXIST, "Vnode not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ALREADY_EXIST, "Vnode already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_HASH_MISMATCH, "Vnode hash mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_NOT_EXISTS, "Table column not exists")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册