提交 33131552 编写于 作者: H hjxilinx

[TBASE-814]

上级 69cdd4cd
......@@ -134,6 +134,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
tscProcessSql(pObj->pHb);
}
//TODO HANDLE error from mgmt
void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
STscObj *pTscObj = pSql->pTscObj;
#ifdef CLUSTER
......@@ -163,10 +164,11 @@ void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
connInit.spi = 1;
connInit.encrypt = 0;
connInit.secret = pSql->pTscObj->pass;
#ifdef CLUSTER
connInit.peerIp = tscMgmtIpList.ipstr[pSql->index];
#else
connInit.peerIp = tsServerIpStr;
connInit.peerIp = tsServerIpStr;
#endif
thandle = taosOpenRpcConn(&connInit, pCode);
}
......@@ -278,6 +280,11 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
break;
}
// the pSql->res.code is the previous error code.
if (pSql->thandle == NULL && pSql->retry >= pSql->maxRetry) {
*pCode = pSql->res.code;
}
}
int tscSendMsgToServer(SSqlObj *pSql) {
......@@ -389,10 +396,8 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
// todo taos_stop_query() in async model
/*
* in case of
* 1. query cancelled(pRes->code != TSDB_CODE_QUERY_CANCELLED), do NOT re-issue the
* request to server.
* 2. retrieve, do NOT re-issue the retrieve request since the qhandle may
* have been released by server
* 1. query cancelled(pRes->code != TSDB_CODE_QUERY_CANCELLED), do NOT re-issue the request to server.
* 2. retrieve, do NOT re-issue the retrieve request since the qhandle may have been released by server
*/
if (pCmd->command != TSDB_SQL_FETCH && pCmd->command != TSDB_SQL_RETRIEVE && pCmd->command != TSDB_SQL_KILL_QUERY &&
pRes->code != TSDB_CODE_QUERY_CANCELLED) {
......@@ -419,7 +424,9 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
}
} else {
#ifdef CLUSTER
if (pMsg->content[0] == TSDB_CODE_REDIRECT) {
uint16_t rspCode = pMsg->content[0];
if (rspCode == TSDB_CODE_REDIRECT) {
tscTrace("%p it shall be redirected!", pSql);
taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
pSql->thandle = NULL;
......@@ -433,28 +440,23 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
code = tscSendMsgToServer(pSql);
if (code == 0) return pSql;
msg = NULL;
} else if (pMsg->content[0] == TSDB_CODE_NOT_ACTIVE_SESSION || pMsg->content[0] == TSDB_CODE_NETWORK_UNAVAIL ||
pMsg->content[0] == TSDB_CODE_INVALID_SESSION_ID) {
} else if (rspCode == TSDB_CODE_NOT_ACTIVE_TABLE || rspCode == TSDB_CODE_INVALID_TABLE_ID ||
rspCode == TSDB_CODE_INVALID_VNODE_ID || rspCode == TSDB_CODE_NOT_ACTIVE_VNODE ||
rspCode == TSDB_CODE_NETWORK_UNAVAIL) {
#else
if (pMsg->content[0] == TSDB_CODE_NOT_ACTIVE_SESSION || pMsg->content[0] == TSDB_CODE_NETWORK_UNAVAIL ||
pMsg->content[0] == TSDB_CODE_INVALID_SESSION_ID) {
if (rspCode == TSDB_CODE_NOT_ACTIVE_TABLE || rspCode == TSDB_CODE_INVALID_TABLE_ID ||
rspCode == TSDB_CODE_INVALID_VNODE_ID || rspCode == TSDB_CODE_NOT_ACTIVE_VNODE ||
rspCode == TSDB_CODE_NETWORK_UNAVAIL) {
#endif
pSql->thandle = NULL;
taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
if (pMeterMetaInfo != NULL && UTIL_METER_IS_METRIC(pMeterMetaInfo) &&
pMsg->content[0] == TSDB_CODE_NOT_ACTIVE_SESSION) {
if ((pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) &&
(rspCode == TSDB_CODE_INVALID_TABLE_ID || rspCode == TSDB_CODE_INVALID_VNODE_ID)) {
/*
* for metric query, in case of any meter missing during query, sub-query of metric query will failed,
* causing metric query failed, and return TSDB_CODE_METRICMETA_EXPIRED code to app
*/
tscTrace("%p invalid meters id cause metric query failed, code:%d", pSql, pMsg->content[0]);
code = TSDB_CODE_METRICMETA_EXPIRED;
} else if ((pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) &&
pMsg->content[0] == TSDB_CODE_INVALID_SESSION_ID) {
/*
* session id is invalid(e.g., less than 0 or larger than maximum session per
* vnode) in submit/query msg, no retry
* In case of the insert/select operations, the invalid table(vnode) id means
* the submit/query msg is invalid, renew meter meta will not help to fix this problem,
* so return the invalid_query_msg to client directly.
*/
code = TSDB_CODE_INVALID_QUERY_MSG;
} else if (pCmd->command == TSDB_SQL_CONNECT) {
......@@ -462,9 +464,11 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
} else if (pCmd->command == TSDB_SQL_HB) {
code = TSDB_CODE_NOT_READY;
} else {
tscTrace("%p it shall renew meter meta, code:%d", pSql, pMsg->content[0]);
tscTrace("%p it shall renew meter meta, code:%d", pSql, rspCode);
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
pSql->res.code = (uint8_t) rspCode; // keep the previous error code
code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return pSql;
......@@ -476,7 +480,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
msg = NULL;
} else { // for other error set and return to invoker
code = pMsg->content[0];
code = rspCode;
}
}
......
......@@ -41,7 +41,7 @@ extern "C" {
#define TSDB_CODE_ACTION_NOT_ONLINE 18
#define TSDB_CODE_ACTION_SEND_FAILD 19
#define TSDB_CODE_NOT_ACTIVE_SESSION 20
#define TSDB_CODE_INSERT_FAILED 21
#define TSDB_CODE_INVALID_VNODE_ID 21
#define TSDB_CODE_APP_ERROR 22
#define TSDB_CODE_INVALID_IE 23
#define TSDB_CODE_INVALID_VALUE 24
......@@ -74,7 +74,7 @@ extern "C" {
#define TSDB_CODE_OTHERS 51
#define TSDB_CODE_NO_REMOVE_MASTER 52
#define TSDB_CODE_WRONG_SCHEMA 53
#define TSDB_CODE_NO_RESULT 54
#define TSDB_CODE_NOT_ACTIVE_VNODE 54
#define TSDB_CODE_TOO_MANY_USERS 55
#define TSDB_CODE_TOO_MANY_DATABSES 56
#define TSDB_CODE_TOO_MANY_TABLES 57
......@@ -134,6 +134,8 @@ extern "C" {
#define TSDB_CODE_NOT_SUPER_TABLE 111 //
#define TSDB_CODE_DUPLICATE_TAGS 112 // tags value for join not unique
#define TSDB_CODE_INVALID_SUBMIT_MSG 113
#define TSDB_CODE_NOT_ACTIVE_TABLE 114
#define TSDB_CODE_INVALID_TABLE_ID 115
// message type
#define TSDB_MSG_TYPE_REG 1
......
......@@ -145,7 +145,7 @@ char *tsError[] = {"success",
"not online",
"send failed",
"not active session", // 20
"insert failed",
"invalid vnode id",
"App error",
"invalid IE",
"invalid value",
......@@ -178,7 +178,7 @@ char *tsError[] = {"success",
"others",
"can't remove dnode which is master",
"wrong schema",
"no results",
"vnode not active(not created yet or dropped already)",
"num of users execeed maxUsers", //55
"num of databases execeed maxDbs",
"num of tables execeed maxTables",
......@@ -233,9 +233,11 @@ char *tsError[] = {"success",
"invalid query message",
"timestamp disordered in cache block",
"timestamp disordered in file block",
"invalid commit log", //110
"server no disk space",
"invalid commit log",
"server no disk space", //110
"only super table has metric meta info",
"tags value not unique for join",
"invalid submit message",
"not active table(not created yet or deleted already)", //114
"invalid table id",
};
......@@ -105,14 +105,14 @@ int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pObj) {
if (vid >= TSDB_MAX_VNODES || vid < 0) {
dError("vid:%d, vnode is out of range", vid);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_VNODE_ID;
goto _over;
}
pVnode = vnodeList + vid;
if (pVnode->cfg.maxSessions <= 0) {
dError("vid:%d, not activated", vid);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _over;
}
......@@ -141,27 +141,27 @@ int vnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj) {
if (vid >= TSDB_MAX_VNODES || vid < 0) {
dError("vid:%d, vnode is out of range", vid);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_VNODE_ID;
goto _over;
}
pVnode = vnodeList + vid;
if (pVnode->cfg.maxSessions <= 0 || pVnode->pCachePool == NULL) {
dError("vid:%d is not activated yet", pAlter->vnode);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _over;
}
if (pAlter->sid >= pVnode->cfg.maxSessions || pAlter->sid < 0) {
dError("vid:%d sid:%d uid:%ld, sid is out of range", pAlter->vnode, pAlter->sid, pAlter->uid);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_TABLE_ID;
goto _over;
}
SMeterObj *pMeterObj = vnodeList[vid].meterList[sid];
if (pMeterObj == NULL || sid != pMeterObj->sid || vid != pMeterObj->vnode) {
dError("vid:%d sid:%d, no active session", vid, sid);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
dError("vid:%d sid:%d, no active table", vid, sid);
code = TSDB_CODE_NOT_ACTIVE_TABLE;
goto _over;
}
......@@ -195,7 +195,7 @@ int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen) {
if (pCreate->vnode >= TSDB_MAX_VNODES || pCreate->vnode < 0) {
dError("vid:%d is out of range", pCreate->vnode);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_VNODE_ID;
goto _create_over;
}
......@@ -203,13 +203,13 @@ int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen) {
if (pVnode->pCachePool == NULL) {
dError("vid:%d is not activated yet", pCreate->vnode);
vnodeSendVpeerCfgMsg(pCreate->vnode);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _create_over;
}
if (pCreate->sid >= pVnode->cfg.maxSessions || pCreate->sid < 0) {
dError("vid:%d sid:%d id:%s, sid is out of range", pCreate->vnode, pCreate->sid, pCreate->meterId);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_TABLE_ID;
goto _create_over;
}
......
......@@ -618,7 +618,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
dWarn("vid:%d sid:%d id:%s, meter is dropped, abort insert, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->state);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_TABLE;
break;
}
......
......@@ -269,7 +269,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pQueryMsg->vnode >= TSDB_MAX_VNODES || pQueryMsg->vnode < 0) {
dTrace("qmsg:%p,vid:%d is out of range", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_TABLE_ID;
goto _query_over;
}
......@@ -278,7 +278,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pVnode->cfg.maxSessions == 0) {
dError("qmsg:%p,vid:%d is not activated yet", pQueryMsg, pQueryMsg->vnode);
vnodeSendVpeerCfgMsg(pQueryMsg->vnode);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_TABLE;
goto _query_over;
}
......@@ -295,7 +295,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pVnode->meterList == NULL) {
dError("qmsg:%p,vid:%d has been closed", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _query_over;
}
......@@ -305,7 +305,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
dTrace("qmsg:%p sid:%d is out of range, valid range:[%d,%d]", pQueryMsg, pSids[i]->sid, 0,
pVnode->cfg.maxSessions);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_TABLE_ID;
goto _query_over;
}
}
......@@ -488,7 +488,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pSubmit->vnode >= TSDB_MAX_VNODES || pSubmit->vnode < 0) {
dTrace("vnode:%d is out of range", pSubmit->vnode);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_VNODE_ID;
goto _submit_over;
}
......@@ -496,7 +496,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pVnode->cfg.maxSessions == 0 || pVnode->meterList == NULL) {
dError("vid:%d is not activated for submit", pSubmit->vnode);
vnodeSendVpeerCfgMsg(pSubmit->vnode);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _submit_over;
}
......@@ -529,7 +529,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pBlocks->sid >= pVnode->cfg.maxSessions || pBlocks->sid <= 0) {
dTrace("sid:%d is out of range", pBlocks->sid);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_TABLE_ID;
goto _submit_over;
}
......@@ -538,9 +538,9 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
SMeterObj *pMeterObj = vnodeList[vnode].meterList[sid];
if (pMeterObj == NULL) {
dError("vid:%d sid:%d, no active session", vnode, sid);
dError("vid:%d sid:%d, no active table", vnode, sid);
vnodeSendMeterCfgMsg(vnode, sid);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_TABLE;
goto _submit_over;
}
......@@ -579,7 +579,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) {
dTrace("vid:%d sid:%d id:%s, it is removed, state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
pMeterObj->state);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_TABLE;
break;
} else {// waiting for 300ms by default and try again
dTrace("vid:%d sid:%d id:%s, try submit again since in state:%d", pMeterObj->vnode, pMeterObj->sid,
......
......@@ -553,7 +553,7 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid
if (pMeter == NULL || (pMeter->state > TSDB_METER_STATE_INSERT)) {
if (pMeter == NULL || vnodeIsMeterState(pMeter, TSDB_METER_STATE_DELETING)) {
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_TABLE;
dError("qmsg:%p, vid:%d sid:%d, not there or will be dropped", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid);
vnodeSendMeterCfgMsg(pQueryMsg->vnode, pSids[i]->sid);
} else {//update or import
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册