未验证 提交 c0f37c72 编写于 作者: S slguan 提交者: GitHub

Merge pull request #716 from taosdata/feature/liaohj

Feature/liaohj
......@@ -30,10 +30,10 @@ extern "C" {
#include "tsdb.h"
#include "tscSecondaryMerge.h"
#define UTIL_METER_IS_METRIC(cmd) (((cmd)->pMeterMeta != NULL) && ((cmd)->pMeterMeta->meterType == TSDB_METER_METRIC))
#define UTIL_METER_IS_NOMRAL_METER(cmd) (!(UTIL_METER_IS_METRIC(cmd)))
#define UTIL_METER_IS_CREATE_FROM_METRIC(cmd) \
(((cmd)->pMeterMeta != NULL) && ((cmd)->pMeterMeta->meterType == TSDB_METER_MTABLE))
#define UTIL_METER_IS_METRIC(metaInfo) (((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_METRIC))
#define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_METRIC(metaInfo)))
#define UTIL_METER_IS_CREATE_FROM_METRIC(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_MTABLE))
#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0)
......
......@@ -1482,7 +1482,9 @@ static void first_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SET_VAL(pCtx, 1, 1);
memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes);
DO_UPDATE_TAG_COLUMNS(pCtx, 0);
TSKEY ts = pCtx->ptsList[index];
DO_UPDATE_TAG_COLUMNS(pCtx, ts);
SResultInfo *pInfo = GET_RES_INFO(pCtx);
pInfo->hasResult = DATA_SET_FLAG;
......@@ -1575,7 +1577,7 @@ static void first_dist_func_merge(SQLFunctionCtx *pCtx) {
SFirstLastInfo *pOutput = (SFirstLastInfo *)(pCtx->aOutputBuf + pCtx->inputBytes);
if (pOutput->hasResult != DATA_SET_FLAG || pInput->ts < pOutput->ts) {
memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes + sizeof(SFirstLastInfo));
DO_UPDATE_TAG_COLUMNS(pCtx, 0);
DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts);
}
}
......@@ -1623,7 +1625,9 @@ static void last_function(SQLFunctionCtx *pCtx) {
}
memcpy(pCtx->aOutputBuf, data, pCtx->inputBytes);
DO_UPDATE_TAG_COLUMNS(pCtx, 0);
TSKEY ts = pCtx->ptsList[i];
DO_UPDATE_TAG_COLUMNS(pCtx, ts);
SResultInfo *pInfo = GET_RES_INFO(pCtx);
pInfo->hasResult = DATA_SET_FLAG;
......@@ -1648,7 +1652,9 @@ static void last_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SET_VAL(pCtx, 1, 1);
memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes);
DO_UPDATE_TAG_COLUMNS(pCtx, 0);
TSKEY ts = pCtx->ptsList[index];
DO_UPDATE_TAG_COLUMNS(pCtx, ts);
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
pResInfo->hasResult = DATA_SET_FLAG;
......@@ -1745,7 +1751,7 @@ static void last_dist_func_merge(SQLFunctionCtx *pCtx) {
if (pOutput->hasResult != DATA_SET_FLAG || pOutput->ts < pInput->ts) {
memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes + sizeof(SFirstLastInfo));
DO_UPDATE_TAG_COLUMNS(pCtx, 0);
DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts);
}
}
......@@ -1800,7 +1806,7 @@ static void last_row_function(SQLFunctionCtx *pCtx) {
pInfo1->ts = pCtx->param[0].i64Key;
pInfo1->hasResult = DATA_SET_FLAG;
DO_UPDATE_TAG_COLUMNS(pCtx, 0);
DO_UPDATE_TAG_COLUMNS(pCtx, pInfo1->ts);
}
SET_VAL(pCtx, pCtx->size, 1);
......@@ -3779,9 +3785,6 @@ static void getStatics_i64(int64_t *primaryKey, int64_t *data, int32_t numOfRow,
assert(numOfRow <= INT16_MAX);
int64_t lastKey = 0;
int64_t lastVal = TSDB_DATA_BIGINT_NULL;
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull(&data[i], TSDB_DATA_TYPE_BIGINT)) {
(*numOfNull) += 1;
......@@ -3873,9 +3876,6 @@ static void getStatics_d(int64_t *primaryKey, double *data, int32_t numOfRow, do
assert(numOfRow <= INT16_MAX);
int64_t lastKey = 0;
double lastVal = TSDB_DATA_DOUBLE_NULL;
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull(&data[i], TSDB_DATA_TYPE_DOUBLE)) {
(*numOfNull) += 1;
......
......@@ -2806,59 +2806,6 @@ static bool functionCompatibleCheck(SSqlCmd* pCmd) {
}
}
// additional check for select aggfuntion(column), column1 from table_name group by(column1);
if ((pCmd->type & TSDB_QUERY_TYPE_PROJECTION_QUERY) == TSDB_QUERY_TYPE_PROJECTION_QUERY) {
bool isAggFunc = false;
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
int16_t functionId = tscSqlExprGet(pCmd, i)->functionId;
if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TS ||
functionId == TSDB_FUNC_ARITHM) {
continue;
}
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) == 0) {
isAggFunc = true;
break;
}
}
// TODO change the type, the type is not correct
if (isAggFunc) {
pCmd->type &= (~TSDB_QUERY_TYPE_PROJECTION_QUERY);
// agg function mixed up with project query without group by exists
if (pCmd->groupbyExpr.numOfGroupCols == 0) {
return false;
}
// get the project column
int32_t numOfPrjColumn = 0;
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pCmd, i);
if (pExpr->functionId == TSDB_FUNC_PRJ) {
numOfPrjColumn += 1;
bool qualifiedCol = false;
for (int32_t j = 0; j < pCmd->groupbyExpr.numOfGroupCols; ++j) {
if (pExpr->colInfo.colId == pCmd->groupbyExpr.columnInfo[j].colId) {
qualifiedCol = true;
pExpr->param[0].i64Key = 1; // limit the output to be 1 for each state value
pExpr->numOfParams = 1;
break;
}
}
if (!qualifiedCol) {
setErrMsg(pCmd, msg1);
return false;
}
}
}
}
}
return true;
}
......@@ -5416,6 +5363,27 @@ static void doUpdateSqlFunctionForTagPrj(SSqlCmd* pCmd) {
}
}
static void doUpdateSqlFunctionForColPrj(SSqlCmd* pCmd) {
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pCmd, i);
if (pExpr->functionId == TSDB_FUNC_PRJ) {
bool qualifiedCol = false;
for (int32_t j = 0; j < pCmd->groupbyExpr.numOfGroupCols; ++j) {
if (pExpr->colInfo.colId == pCmd->groupbyExpr.columnInfo[j].colId) {
qualifiedCol = true;
pExpr->param[0].i64Key = 1; // limit the output to be 1 for each state value
pExpr->numOfParams = 1;
break;
}
}
assert(qualifiedCol);
}
}
}
static bool tagColumnInGroupby(SSqlGroupbyExpr* pGroupbyExpr, int16_t columnId) {
for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
if (columnId == pGroupbyExpr->columnInfo[j].colId && pGroupbyExpr->columnInfo[j].flag == TSDB_COL_TAG) {
......@@ -5480,7 +5448,8 @@ static void updateTagPrjFunction(SSqlCmd* pCmd) {
static int32_t checkUpdateTagPrjFunctions(SSqlCmd* pCmd) {
const char* msg1 = "only one selectivity function allowed in presence of tags function";
const char* msg2 = "functions not allowed";
const char* msg3 = "aggregation function should not be mixed up with projection";
bool tagColExists = false;
int16_t numOfTimestamp = 0; // primary timestamp column
int16_t numOfSelectivity = 0;
......@@ -5494,21 +5463,21 @@ static int32_t checkUpdateTagPrjFunctions(SSqlCmd* pCmd) {
break;
}
}
if (tagColExists) { // check if the selectivity function exists
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
int16_t functionId = tscSqlExprGet(pCmd, i)->functionId;
if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TS) {
continue;
}
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
numOfSelectivity++;
} else {
numOfAggregation++;
}
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
int16_t functionId = tscSqlExprGet(pCmd, i)->functionId;
if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TS) {
continue;
}
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
numOfSelectivity++;
} else {
numOfAggregation++;
}
}
if (tagColExists) { // check if the selectivity function exists
// When the tag projection function on tag column that is not in the group by clause, aggregation function and
// selectivity function exist in select clause is not allowed.
if (numOfAggregation > 0) {
......@@ -5521,6 +5490,7 @@ static int32_t checkUpdateTagPrjFunctions(SSqlCmd* pCmd) {
*/
if (numOfSelectivity == 1) {
doUpdateSqlFunctionForTagPrj(pCmd);
doUpdateSqlFunctionForColPrj(pCmd);
} else if (numOfSelectivity > 1) {
/*
* If more than one selectivity functions exist, all the selectivity functions must be last_row.
......@@ -5539,6 +5509,20 @@ static int32_t checkUpdateTagPrjFunctions(SSqlCmd* pCmd) {
}
doUpdateSqlFunctionForTagPrj(pCmd);
doUpdateSqlFunctionForColPrj(pCmd);
}
} else {
if ((pCmd->type & TSDB_QUERY_TYPE_PROJECTION_QUERY) == TSDB_QUERY_TYPE_PROJECTION_QUERY) {
if (numOfAggregation > 0 && pCmd->groupbyExpr.numOfGroupCols == 0) {
setErrMsg(pCmd, msg3);
return TSDB_CODE_INVALID_SQL;
}
if (numOfAggregation > 0 || numOfSelectivity > 0) {
// clear the projection type flag
pCmd->type &= (~TSDB_QUERY_TYPE_PROJECTION_QUERY);
doUpdateSqlFunctionForColPrj(pCmd);
}
}
}
......@@ -5668,8 +5652,7 @@ int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) {
}
if (IS_MULTIOUTPUT(aAggs[functId].nStatus) && functId != TSDB_FUNC_TOP && functId != TSDB_FUNC_BOTTOM &&
functId != TSDB_FUNC_TAGPRJ &&
(functId == TSDB_FUNC_PRJ && pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX)) {
functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_PRJ) {
setErrMsg(pCmd, msg1);
return TSDB_CODE_INVALID_SQL;
}
......@@ -5697,6 +5680,8 @@ int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) {
setErrMsg(pCmd, msg3);
return TSDB_CODE_INVALID_SQL;
}
return TSDB_CODE_SUCCESS;
} else {
return checkUpdateTagPrjFunctions(pCmd);
}
......
......@@ -134,6 +134,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
tscProcessSql(pObj->pHb);
}
//TODO HANDLE error from mgmt
void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
STscObj *pTscObj = pSql->pTscObj;
#ifdef CLUSTER
......@@ -163,10 +164,11 @@ void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
connInit.spi = 1;
connInit.encrypt = 0;
connInit.secret = pSql->pTscObj->pass;
#ifdef CLUSTER
connInit.peerIp = tscMgmtIpList.ipstr[pSql->index];
#else
connInit.peerIp = tsServerIpStr;
connInit.peerIp = tsServerIpStr;
#endif
thandle = taosOpenRpcConn(&connInit, pCode);
}
......@@ -278,6 +280,11 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
break;
}
// the pSql->res.code is the previous error code.
if (pSql->thandle == NULL && pSql->retry >= pSql->maxRetry) {
*pCode = pSql->res.code;
}
}
int tscSendMsgToServer(SSqlObj *pSql) {
......@@ -313,11 +320,19 @@ int tscSendMsgToServer(SSqlObj *pSql) {
char *pStart = taosBuildReqHeader(pSql->thandle, pSql->cmd.msgType, buf);
if (pStart) {
/*
* this SQL object may be released by other thread due to the completion of this query even before the log
* is dumped to log file. So the signature needs to be kept in a local variable.
*/
uint64_t signature = (uint64_t) pSql->signature;
if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, buf);
int ret = taosSendMsgToPeerH(pSql->thandle, pStart, pSql->cmd.payloadLen, pSql);
if (ret >= 0) code = 0;
tscTrace("%p send msg ret:%d code:%d sig:%p", pSql, ret, code, pSql->signature);
if (ret >= 0) {
code = 0;
}
tscTrace("%p send msg ret:%d code:%d sig:%p", pSql, ret, code, signature);
}
}
......@@ -389,10 +404,8 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
// todo taos_stop_query() in async model
/*
* in case of
* 1. query cancelled(pRes->code != TSDB_CODE_QUERY_CANCELLED), do NOT re-issue the
* request to server.
* 2. retrieve, do NOT re-issue the retrieve request since the qhandle may
* have been released by server
* 1. query cancelled(pRes->code != TSDB_CODE_QUERY_CANCELLED), do NOT re-issue the request to server.
* 2. retrieve, do NOT re-issue the retrieve request since the qhandle may have been released by server
*/
if (pCmd->command != TSDB_SQL_FETCH && pCmd->command != TSDB_SQL_RETRIEVE && pCmd->command != TSDB_SQL_KILL_QUERY &&
pRes->code != TSDB_CODE_QUERY_CANCELLED) {
......@@ -419,7 +432,9 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
}
} else {
#ifdef CLUSTER
if (pMsg->content[0] == TSDB_CODE_REDIRECT) {
uint16_t rspCode = pMsg->content[0];
if (rspCode == TSDB_CODE_REDIRECT) {
tscTrace("%p it shall be redirected!", pSql);
taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
pSql->thandle = NULL;
......@@ -433,28 +448,23 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
code = tscSendMsgToServer(pSql);
if (code == 0) return pSql;
msg = NULL;
} else if (pMsg->content[0] == TSDB_CODE_NOT_ACTIVE_SESSION || pMsg->content[0] == TSDB_CODE_NETWORK_UNAVAIL ||
pMsg->content[0] == TSDB_CODE_INVALID_SESSION_ID) {
} else if (rspCode == TSDB_CODE_NOT_ACTIVE_TABLE || rspCode == TSDB_CODE_INVALID_TABLE_ID ||
rspCode == TSDB_CODE_INVALID_VNODE_ID || rspCode == TSDB_CODE_NOT_ACTIVE_VNODE ||
rspCode == TSDB_CODE_NETWORK_UNAVAIL) {
#else
if (pMsg->content[0] == TSDB_CODE_NOT_ACTIVE_SESSION || pMsg->content[0] == TSDB_CODE_NETWORK_UNAVAIL ||
pMsg->content[0] == TSDB_CODE_INVALID_SESSION_ID) {
if (rspCode == TSDB_CODE_NOT_ACTIVE_TABLE || rspCode == TSDB_CODE_INVALID_TABLE_ID ||
rspCode == TSDB_CODE_INVALID_VNODE_ID || rspCode == TSDB_CODE_NOT_ACTIVE_VNODE ||
rspCode == TSDB_CODE_NETWORK_UNAVAIL) {
#endif
pSql->thandle = NULL;
taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
if (pMeterMetaInfo != NULL && UTIL_METER_IS_METRIC(pMeterMetaInfo) &&
pMsg->content[0] == TSDB_CODE_NOT_ACTIVE_SESSION) {
if ((pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) &&
(rspCode == TSDB_CODE_INVALID_TABLE_ID || rspCode == TSDB_CODE_INVALID_VNODE_ID)) {
/*
* for metric query, in case of any meter missing during query, sub-query of metric query will failed,
* causing metric query failed, and return TSDB_CODE_METRICMETA_EXPIRED code to app
*/
tscTrace("%p invalid meters id cause metric query failed, code:%d", pSql, pMsg->content[0]);
code = TSDB_CODE_METRICMETA_EXPIRED;
} else if ((pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) &&
pMsg->content[0] == TSDB_CODE_INVALID_SESSION_ID) {
/*
* session id is invalid(e.g., less than 0 or larger than maximum session per
* vnode) in submit/query msg, no retry
* In case of the insert/select operations, the invalid table(vnode) id means
* the submit/query msg is invalid, renew meter meta will not help to fix this problem,
* so return the invalid_query_msg to client directly.
*/
code = TSDB_CODE_INVALID_QUERY_MSG;
} else if (pCmd->command == TSDB_SQL_CONNECT) {
......@@ -462,9 +472,11 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
} else if (pCmd->command == TSDB_SQL_HB) {
code = TSDB_CODE_NOT_READY;
} else {
tscTrace("%p it shall renew meter meta, code:%d", pSql, pMsg->content[0]);
tscTrace("%p it shall renew meter meta, code:%d", pSql, rspCode);
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
pSql->res.code = (uint8_t) rspCode; // keep the previous error code
code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return pSql;
......@@ -476,7 +488,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
msg = NULL;
} else { // for other error set and return to invoker
code = pMsg->content[0];
code = rspCode;
}
}
......@@ -723,9 +735,16 @@ int tscProcessSql(SSqlObj *pSql) {
#else
pSql->maxRetry = 2;
#endif
// the pMeterMetaInfo cannot be NULL
if (pMeterMetaInfo == NULL) {
pSql->res.code = TSDB_CODE_OTHERS;
return pSql->res.code;
}
if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
pSql->index = pMeterMetaInfo->pMeterMeta->index;
} else { // it must be the parent SSqlObj for metric query
} else { // it must be the parent SSqlObj for super table query
if ((pSql->cmd.type & TSDB_QUERY_TYPE_SUBQUERY) != 0) {
int32_t idx = pSql->cmd.vnodeIdx;
SVnodeSidList *pSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
......@@ -2460,10 +2479,10 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql) {
pMsg += sizeof(SMgmtHead);
*((uint64_t *)pMsg) = pSql->res.qhandle;
*((uint64_t *) pMsg) = pSql->res.qhandle;
pMsg += sizeof(pSql->res.qhandle);
*pMsg = htons(pCmd->type);
*((uint16_t*) pMsg) = htons(pCmd->type);
pMsg += sizeof(pCmd->type);
msgLen = pMsg - pStart;
......@@ -3451,11 +3470,12 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
pRes->row = 0;
/**
* If the query result is exhausted, the connection will be recycled.
* If current query is to free resource at server side, the connection will be recycle.
* If the query result is exhausted, or current query is to free resource at server side,
* the connection will be recycled.
*/
if ((pRes->numOfRows == 0 && !(tscProjectionQueryOnMetric(pCmd) && pRes->offset > 0)) ||
((pCmd->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE)) {
tscTrace("%p no result or free resource, recycle connection", pSql);
taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user);
pSql->thandle = NULL;
} else {
......@@ -3769,7 +3789,7 @@ void tscInitMsgs() {
tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromMgmt;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromVnode; // rsp handled by same function.
tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_TAGS] = tscProcessTagRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;
......
......@@ -41,7 +41,7 @@ extern "C" {
#define TSDB_CODE_ACTION_NOT_ONLINE 18
#define TSDB_CODE_ACTION_SEND_FAILD 19
#define TSDB_CODE_NOT_ACTIVE_SESSION 20
#define TSDB_CODE_INSERT_FAILED 21
#define TSDB_CODE_INVALID_VNODE_ID 21
#define TSDB_CODE_APP_ERROR 22
#define TSDB_CODE_INVALID_IE 23
#define TSDB_CODE_INVALID_VALUE 24
......@@ -74,7 +74,7 @@ extern "C" {
#define TSDB_CODE_OTHERS 51
#define TSDB_CODE_NO_REMOVE_MASTER 52
#define TSDB_CODE_WRONG_SCHEMA 53
#define TSDB_CODE_NO_RESULT 54
#define TSDB_CODE_NOT_ACTIVE_VNODE 54
#define TSDB_CODE_TOO_MANY_USERS 55
#define TSDB_CODE_TOO_MANY_DATABSES 56
#define TSDB_CODE_TOO_MANY_TABLES 57
......@@ -134,6 +134,8 @@ extern "C" {
#define TSDB_CODE_NOT_SUPER_TABLE 111 //
#define TSDB_CODE_DUPLICATE_TAGS 112 // tags value for join not unique
#define TSDB_CODE_INVALID_SUBMIT_MSG 113
#define TSDB_CODE_NOT_ACTIVE_TABLE 114
#define TSDB_CODE_INVALID_TABLE_ID 115
// message type
#define TSDB_MSG_TYPE_REG 1
......@@ -673,7 +675,7 @@ typedef struct {
typedef struct {
uint64_t qhandle;
int16_t free;
uint16_t free;
} SRetrieveMeterMsg;
typedef struct {
......
......@@ -846,7 +846,7 @@ void shellGetGrantInfo(void *con) {
TAOS_FIELD *fields = taos_fetch_fields(result);
TAOS_ROW row = taos_fetch_row(result);
if (row == NULL) {
fprintf(stderr, "\nGrant information is empty.\n");
fprintf(stderr, "\nFailed to get grant information from server. Abort.\n");
exit(0);
}
......
......@@ -25,8 +25,10 @@ extern "C" {
#include <arpa/inet.h>
#include <assert.h>
#include <ctype.h>
#include <dirent.h>
#include <endian.h>
#include <errno.h>
#include <float.h>
#include <ifaddrs.h>
#include <limits.h>
......@@ -61,6 +63,7 @@ extern "C" {
#include <unistd.h>
#include <wchar.h>
#include <wordexp.h>
#include <wctype.h>
#define taosCloseSocket(x) \
{ \
......
......@@ -145,7 +145,7 @@ char *tsError[] = {"success",
"not online",
"send failed",
"not active session", // 20
"insert failed",
"invalid vnode id",
"App error",
"invalid IE",
"invalid value",
......@@ -178,7 +178,7 @@ char *tsError[] = {"success",
"others",
"can't remove dnode which is master",
"wrong schema",
"no results",
"vnode not active(not created yet or dropped already)",
"num of users execeed maxUsers", //55
"num of databases execeed maxDbs",
"num of tables execeed maxTables",
......@@ -233,9 +233,11 @@ char *tsError[] = {"success",
"invalid query message",
"timestamp disordered in cache block",
"timestamp disordered in file block",
"invalid commit log", //110
"server no disk space",
"invalid commit log",
"server no disk space", //110
"only super table has metric meta info",
"tags value not unique for join",
"invalid submit message",
"not active table(not created yet or deleted already)", //114
"invalid table id",
};
......@@ -105,14 +105,14 @@ int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pObj) {
if (vid >= TSDB_MAX_VNODES || vid < 0) {
dError("vid:%d, vnode is out of range", vid);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_VNODE_ID;
goto _over;
}
pVnode = vnodeList + vid;
if (pVnode->cfg.maxSessions <= 0) {
dError("vid:%d, not activated", vid);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _over;
}
......@@ -141,27 +141,27 @@ int vnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj) {
if (vid >= TSDB_MAX_VNODES || vid < 0) {
dError("vid:%d, vnode is out of range", vid);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_VNODE_ID;
goto _over;
}
pVnode = vnodeList + vid;
if (pVnode->cfg.maxSessions <= 0 || pVnode->pCachePool == NULL) {
dError("vid:%d is not activated yet", pAlter->vnode);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _over;
}
if (pAlter->sid >= pVnode->cfg.maxSessions || pAlter->sid < 0) {
dError("vid:%d sid:%d uid:%ld, sid is out of range", pAlter->vnode, pAlter->sid, pAlter->uid);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_TABLE_ID;
goto _over;
}
SMeterObj *pMeterObj = vnodeList[vid].meterList[sid];
if (pMeterObj == NULL || sid != pMeterObj->sid || vid != pMeterObj->vnode) {
dError("vid:%d sid:%d, no active session", vid, sid);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
dError("vid:%d sid:%d, no active table", vid, sid);
code = TSDB_CODE_NOT_ACTIVE_TABLE;
goto _over;
}
......@@ -195,7 +195,7 @@ int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen) {
if (pCreate->vnode >= TSDB_MAX_VNODES || pCreate->vnode < 0) {
dError("vid:%d is out of range", pCreate->vnode);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_VNODE_ID;
goto _create_over;
}
......@@ -203,13 +203,13 @@ int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen) {
if (pVnode->pCachePool == NULL) {
dError("vid:%d is not activated yet", pCreate->vnode);
vnodeSendVpeerCfgMsg(pCreate->vnode);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _create_over;
}
if (pCreate->sid >= pVnode->cfg.maxSessions || pCreate->sid < 0) {
dError("vid:%d sid:%d id:%s, sid is out of range", pCreate->vnode, pCreate->sid, pCreate->meterId);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_TABLE_ID;
goto _create_over;
}
......
......@@ -873,7 +873,7 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED);
return -1;
} else {
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) == 0) {
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
rowsToRead = pShow->numOfRows - pShow->numOfReads;
}
......@@ -905,7 +905,7 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) {
pMsg = pRsp->data;
// if free flag is set, client wants to clean the resources
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) == 0)
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE)
rowsRead = (*mgmtRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, pConn);
if (rowsRead < 0) {
......
......@@ -618,7 +618,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
dWarn("vid:%d sid:%d id:%s, meter is dropped, abort insert, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->state);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_TABLE;
break;
}
......
......@@ -1840,7 +1840,7 @@ static void setCtxTagColumnInfo(SQuery* pQuery, SQueryRuntimeEnv* pRuntimeEnv) {
// ts may be the required primary timestamp column
continue;
} else {
assert(0);
// the column may be the normal column, group by normal_column, the functionId is TSDB_FUNC_PRJ
}
}
......
......@@ -269,7 +269,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pQueryMsg->vnode >= TSDB_MAX_VNODES || pQueryMsg->vnode < 0) {
dTrace("qmsg:%p,vid:%d is out of range", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_TABLE_ID;
goto _query_over;
}
......@@ -278,7 +278,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pVnode->cfg.maxSessions == 0) {
dError("qmsg:%p,vid:%d is not activated yet", pQueryMsg, pQueryMsg->vnode);
vnodeSendVpeerCfgMsg(pQueryMsg->vnode);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_TABLE;
goto _query_over;
}
......@@ -295,7 +295,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pVnode->meterList == NULL) {
dError("qmsg:%p,vid:%d has been closed", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _query_over;
}
......@@ -305,7 +305,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
dTrace("qmsg:%p sid:%d is out of range, valid range:[%d,%d]", pQueryMsg, pSids[i]->sid, 0,
pVnode->cfg.maxSessions);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_TABLE_ID;
goto _query_over;
}
}
......@@ -488,7 +488,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pSubmit->vnode >= TSDB_MAX_VNODES || pSubmit->vnode < 0) {
dTrace("vnode:%d is out of range", pSubmit->vnode);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_VNODE_ID;
goto _submit_over;
}
......@@ -496,7 +496,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pVnode->cfg.maxSessions == 0 || pVnode->meterList == NULL) {
dError("vid:%d is not activated for submit", pSubmit->vnode);
vnodeSendVpeerCfgMsg(pSubmit->vnode);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _submit_over;
}
......@@ -529,7 +529,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pBlocks->sid >= pVnode->cfg.maxSessions || pBlocks->sid <= 0) {
dTrace("sid:%d is out of range", pBlocks->sid);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_TABLE_ID;
goto _submit_over;
}
......@@ -538,9 +538,9 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
SMeterObj *pMeterObj = vnodeList[vnode].meterList[sid];
if (pMeterObj == NULL) {
dError("vid:%d sid:%d, no active session", vnode, sid);
dError("vid:%d sid:%d, no active table", vnode, sid);
vnodeSendMeterCfgMsg(vnode, sid);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_TABLE;
goto _submit_over;
}
......@@ -579,7 +579,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) {
dTrace("vid:%d sid:%d id:%s, it is removed, state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
pMeterObj->state);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_TABLE;
break;
} else {// waiting for 300ms by default and try again
dTrace("vid:%d sid:%d id:%s, try submit again since in state:%d", pMeterObj->vnode, pMeterObj->sid,
......
......@@ -553,7 +553,7 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid
if (pMeter == NULL || (pMeter->state > TSDB_METER_STATE_INSERT)) {
if (pMeter == NULL || vnodeIsMeterState(pMeter, TSDB_METER_STATE_DELETING)) {
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_TABLE;
dError("qmsg:%p, vid:%d sid:%d, not there or will be dropped", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid);
vnodeSendMeterCfgMsg(pQueryMsg->vnode, pSids[i]->sid);
} else {//update or import
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册