提交 d276b59c 编写于 作者: S slguan

Merge branch 'develop' into feature/slguan

......@@ -63,7 +63,7 @@ Query OK, 2 row(s) in set (0.001700s)</code></pre>
<a class='anchor' id='主要功能'></a><h2>主要功能</h2>
<p>TDengine的核心功能是时序数据库。除此之外,为减少研发的复杂度、系统维护的难度,TDengine还提供缓存、消息队列、订阅、流式计算等功能。更详细的功能如下:</p>
<ul>
<li>使用类SQL语言插入或查询数据</li>
<li>使用类SQL语言插入或查询数据</li>
<li>支持C/C++, Java(JDBC), Python, Go, RESTful, and Node.JS 开发接口</li>
<li>可通过Python/R/Matlab or TDengine shell做Ad Hoc查询分析</li>
<li>通过定时连续查询支持基于滑动窗口的流式计算</li>
......
......@@ -24,7 +24,7 @@ tags (location binary(20), type int)</code></pre>
<p>说明:</p>
<ol>
<li>TAGS列总长度不能超过512 bytes;</li>
<li>TAGS列的数据类型不能是timestamp和nchar类型;</li>
<li>TAGS列的数据类型不能是timestamp类型;</li>
<li>TAGS列名不能与其他列名相同;</li>
<li>TAGS列名不能为预留关键字. </li></ol></li>
<li><p>显示已创建的超级表</p>
......@@ -40,7 +40,7 @@ tags (location binary(20), type int)</code></pre>
<p>统计属于某个STable并满足查询条件的子表的数量</p></li>
</ul>
<a class='anchor' id='写数据时自动建子表'></a><h2>写数据时自动建子表</h2>
<p>在某些特殊场景中,用户在写数据时并不确定某个设备的表是否存在,此时可使用自动建表语法来实现写入数据时用超级表定义的表结构自动创建不存在的子表,若该表已存在则不会建立新表。注意:自动建表语句只能自动建立子表而不能建立超级表,这就要求超级表已经被事先定义好。自动建表语法跟insert/import语法非常相似,唯一区别是语句中增加了超级表和标签信息。具体语法如下:</p>
<p>在某些特殊场景中,用户在写数据时并不确定某个设备的表是否存在,此时可使用自动建表语法来实现写入数据时用超级表定义的表结构自动创建不存在的子表,若该表已存在则不会建立新表。注意:自动建表语句只能自动建立子表而不能建立超级表,这就要求超级表已经被事先定义好。自动建表语法跟insert/import语法非常相似,唯一区别是语句中增加了超级表和标签信息。具体语法如下:</p>
<pre><code class="mysql language-mysql">INSERT INTO &lt;tb_name&gt; USING &lt;stb_name&gt; TAGS (&lt;tag1_value&gt;, ...) VALUES (field_value, ...) (field_value, ...) ...;</code></pre>
<p>向表tb_name中插入一条或多条记录,如果tb_name这张表不存在,则会用超级表stb_name定义的表结构以及用户指定的标签值(即tag1_value…)来创建名为tb_name新表,并将用户指定的值写入表中。如果tb_name已经存在,则建表过程会被忽略,系统也不会检查tb_name的标签是否与用户指定的标签值一致,也即不会更新已存在表的标签。</p>
<pre><code class="mysql language-mysql">INSERT INTO &lt;tb1_name&gt; USING &lt;stb1_name&gt; TAGS (&lt;tag1_value1&gt;, ...) VALUES (&lt;field1_value1&gt;, ...) (&lt;field1_value2&gt;, ...) ... &lt;tb_name2&gt; USING &lt;stb_name2&gt; TAGS(&lt;tag1_value2&gt;, ...) VALUES (&lt;field1_value1&gt;, ...) ...;</code></pre>
......@@ -105,6 +105,6 @@ GROUP BY location, type </code></pre>
<p>查询仅位于北京以外地区的温度传感器最近24小时(24h)采样值的数量count(*)、平均温度avg(degree)、最高温度max(degree)和最低温度min(degree),将采集结果按照10分钟为周期进行聚合,并将结果按所处地域(location)和传感器类型(type)再次进行聚合。</p>
<pre><code class="mysql language-mysql">SELECT COUNT(*), AVG(degree), MAX(degree), MIN(degree)
FROM thermometer
WHERE name&lt;&gt;'beijing' and ts&gt;=now-1d
WHERE location&lt;&gt;'beijing' and ts&gt;=now-1d
INTERVAL(10M)
GROUP BY location, type</code></pre><a href='../index.html'>回去</a></section></main></div><?php include($s.'/footer.php'); ?><script>$('pre').addClass('prettyprint linenums');PR.prettyPrint()</script><script src='lib/docs/liner.js'></script></body></html>
......@@ -30,10 +30,10 @@ extern "C" {
#include "tsdb.h"
#include "tscSecondaryMerge.h"
#define UTIL_METER_IS_METRIC(cmd) (((cmd)->pMeterMeta != NULL) && ((cmd)->pMeterMeta->meterType == TSDB_METER_METRIC))
#define UTIL_METER_IS_NOMRAL_METER(cmd) (!(UTIL_METER_IS_METRIC(cmd)))
#define UTIL_METER_IS_CREATE_FROM_METRIC(cmd) \
(((cmd)->pMeterMeta != NULL) && ((cmd)->pMeterMeta->meterType == TSDB_METER_MTABLE))
#define UTIL_METER_IS_METRIC(metaInfo) (((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_METRIC))
#define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_METRIC(metaInfo)))
#define UTIL_METER_IS_CREATE_FROM_METRIC(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_MTABLE))
#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0)
......
......@@ -1482,7 +1482,9 @@ static void first_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SET_VAL(pCtx, 1, 1);
memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes);
DO_UPDATE_TAG_COLUMNS(pCtx, 0);
TSKEY ts = pCtx->ptsList[index];
DO_UPDATE_TAG_COLUMNS(pCtx, ts);
SResultInfo *pInfo = GET_RES_INFO(pCtx);
pInfo->hasResult = DATA_SET_FLAG;
......@@ -1575,7 +1577,7 @@ static void first_dist_func_merge(SQLFunctionCtx *pCtx) {
SFirstLastInfo *pOutput = (SFirstLastInfo *)(pCtx->aOutputBuf + pCtx->inputBytes);
if (pOutput->hasResult != DATA_SET_FLAG || pInput->ts < pOutput->ts) {
memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes + sizeof(SFirstLastInfo));
DO_UPDATE_TAG_COLUMNS(pCtx, 0);
DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts);
}
}
......@@ -1623,7 +1625,9 @@ static void last_function(SQLFunctionCtx *pCtx) {
}
memcpy(pCtx->aOutputBuf, data, pCtx->inputBytes);
DO_UPDATE_TAG_COLUMNS(pCtx, 0);
TSKEY ts = pCtx->ptsList[i];
DO_UPDATE_TAG_COLUMNS(pCtx, ts);
SResultInfo *pInfo = GET_RES_INFO(pCtx);
pInfo->hasResult = DATA_SET_FLAG;
......@@ -1648,7 +1652,9 @@ static void last_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SET_VAL(pCtx, 1, 1);
memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes);
DO_UPDATE_TAG_COLUMNS(pCtx, 0);
TSKEY ts = pCtx->ptsList[index];
DO_UPDATE_TAG_COLUMNS(pCtx, ts);
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
pResInfo->hasResult = DATA_SET_FLAG;
......@@ -1745,7 +1751,7 @@ static void last_dist_func_merge(SQLFunctionCtx *pCtx) {
if (pOutput->hasResult != DATA_SET_FLAG || pOutput->ts < pInput->ts) {
memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes + sizeof(SFirstLastInfo));
DO_UPDATE_TAG_COLUMNS(pCtx, 0);
DO_UPDATE_TAG_COLUMNS(pCtx, pInput->ts);
}
}
......@@ -1800,7 +1806,7 @@ static void last_row_function(SQLFunctionCtx *pCtx) {
pInfo1->ts = pCtx->param[0].i64Key;
pInfo1->hasResult = DATA_SET_FLAG;
DO_UPDATE_TAG_COLUMNS(pCtx, 0);
DO_UPDATE_TAG_COLUMNS(pCtx, pInfo1->ts);
}
SET_VAL(pCtx, pCtx->size, 1);
......@@ -3779,9 +3785,6 @@ static void getStatics_i64(int64_t *primaryKey, int64_t *data, int32_t numOfRow,
assert(numOfRow <= INT16_MAX);
int64_t lastKey = 0;
int64_t lastVal = TSDB_DATA_BIGINT_NULL;
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull(&data[i], TSDB_DATA_TYPE_BIGINT)) {
(*numOfNull) += 1;
......@@ -3873,9 +3876,6 @@ static void getStatics_d(int64_t *primaryKey, double *data, int32_t numOfRow, do
assert(numOfRow <= INT16_MAX);
int64_t lastKey = 0;
double lastVal = TSDB_DATA_DOUBLE_NULL;
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull(&data[i], TSDB_DATA_TYPE_DOUBLE)) {
(*numOfNull) += 1;
......
......@@ -2806,59 +2806,6 @@ static bool functionCompatibleCheck(SSqlCmd* pCmd) {
}
}
// additional check for select aggfuntion(column), column1 from table_name group by(column1);
if ((pCmd->type & TSDB_QUERY_TYPE_PROJECTION_QUERY) == TSDB_QUERY_TYPE_PROJECTION_QUERY) {
bool isAggFunc = false;
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
int16_t functionId = tscSqlExprGet(pCmd, i)->functionId;
if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TS ||
functionId == TSDB_FUNC_ARITHM) {
continue;
}
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) == 0) {
isAggFunc = true;
break;
}
}
// TODO change the type, the type is not correct
if (isAggFunc) {
pCmd->type &= (~TSDB_QUERY_TYPE_PROJECTION_QUERY);
// agg function mixed up with project query without group by exists
if (pCmd->groupbyExpr.numOfGroupCols == 0) {
return false;
}
// get the project column
int32_t numOfPrjColumn = 0;
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pCmd, i);
if (pExpr->functionId == TSDB_FUNC_PRJ) {
numOfPrjColumn += 1;
bool qualifiedCol = false;
for (int32_t j = 0; j < pCmd->groupbyExpr.numOfGroupCols; ++j) {
if (pExpr->colInfo.colId == pCmd->groupbyExpr.columnInfo[j].colId) {
qualifiedCol = true;
pExpr->param[0].i64Key = 1; // limit the output to be 1 for each state value
pExpr->numOfParams = 1;
break;
}
}
if (!qualifiedCol) {
setErrMsg(pCmd, msg1);
return false;
}
}
}
}
}
return true;
}
......@@ -5416,6 +5363,27 @@ static void doUpdateSqlFunctionForTagPrj(SSqlCmd* pCmd) {
}
}
static void doUpdateSqlFunctionForColPrj(SSqlCmd* pCmd) {
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pCmd, i);
if (pExpr->functionId == TSDB_FUNC_PRJ) {
bool qualifiedCol = false;
for (int32_t j = 0; j < pCmd->groupbyExpr.numOfGroupCols; ++j) {
if (pExpr->colInfo.colId == pCmd->groupbyExpr.columnInfo[j].colId) {
qualifiedCol = true;
pExpr->param[0].i64Key = 1; // limit the output to be 1 for each state value
pExpr->numOfParams = 1;
break;
}
}
assert(qualifiedCol);
}
}
}
static bool tagColumnInGroupby(SSqlGroupbyExpr* pGroupbyExpr, int16_t columnId) {
for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
if (columnId == pGroupbyExpr->columnInfo[j].colId && pGroupbyExpr->columnInfo[j].flag == TSDB_COL_TAG) {
......@@ -5480,7 +5448,8 @@ static void updateTagPrjFunction(SSqlCmd* pCmd) {
static int32_t checkUpdateTagPrjFunctions(SSqlCmd* pCmd) {
const char* msg1 = "only one selectivity function allowed in presence of tags function";
const char* msg2 = "functions not allowed";
const char* msg3 = "aggregation function should not be mixed up with projection";
bool tagColExists = false;
int16_t numOfTimestamp = 0; // primary timestamp column
int16_t numOfSelectivity = 0;
......@@ -5494,21 +5463,21 @@ static int32_t checkUpdateTagPrjFunctions(SSqlCmd* pCmd) {
break;
}
}
if (tagColExists) { // check if the selectivity function exists
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
int16_t functionId = tscSqlExprGet(pCmd, i)->functionId;
if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TS) {
continue;
}
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
numOfSelectivity++;
} else {
numOfAggregation++;
}
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
int16_t functionId = tscSqlExprGet(pCmd, i)->functionId;
if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TS) {
continue;
}
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
numOfSelectivity++;
} else {
numOfAggregation++;
}
}
if (tagColExists) { // check if the selectivity function exists
// When the tag projection function on tag column that is not in the group by clause, aggregation function and
// selectivity function exist in select clause is not allowed.
if (numOfAggregation > 0) {
......@@ -5521,6 +5490,7 @@ static int32_t checkUpdateTagPrjFunctions(SSqlCmd* pCmd) {
*/
if (numOfSelectivity == 1) {
doUpdateSqlFunctionForTagPrj(pCmd);
doUpdateSqlFunctionForColPrj(pCmd);
} else if (numOfSelectivity > 1) {
/*
* If more than one selectivity functions exist, all the selectivity functions must be last_row.
......@@ -5539,6 +5509,20 @@ static int32_t checkUpdateTagPrjFunctions(SSqlCmd* pCmd) {
}
doUpdateSqlFunctionForTagPrj(pCmd);
doUpdateSqlFunctionForColPrj(pCmd);
}
} else {
if ((pCmd->type & TSDB_QUERY_TYPE_PROJECTION_QUERY) == TSDB_QUERY_TYPE_PROJECTION_QUERY) {
if (numOfAggregation > 0 && pCmd->groupbyExpr.numOfGroupCols == 0) {
setErrMsg(pCmd, msg3);
return TSDB_CODE_INVALID_SQL;
}
if (numOfAggregation > 0 || numOfSelectivity > 0) {
// clear the projection type flag
pCmd->type &= (~TSDB_QUERY_TYPE_PROJECTION_QUERY);
doUpdateSqlFunctionForColPrj(pCmd);
}
}
}
......@@ -5668,8 +5652,7 @@ int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) {
}
if (IS_MULTIOUTPUT(aAggs[functId].nStatus) && functId != TSDB_FUNC_TOP && functId != TSDB_FUNC_BOTTOM &&
functId != TSDB_FUNC_TAGPRJ &&
(functId == TSDB_FUNC_PRJ && pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX)) {
functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_PRJ) {
setErrMsg(pCmd, msg1);
return TSDB_CODE_INVALID_SQL;
}
......@@ -5697,6 +5680,8 @@ int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) {
setErrMsg(pCmd, msg3);
return TSDB_CODE_INVALID_SQL;
}
return TSDB_CODE_SUCCESS;
} else {
return checkUpdateTagPrjFunctions(pCmd);
}
......
......@@ -134,6 +134,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
tscProcessSql(pObj->pHb);
}
//TODO HANDLE error from mgmt
void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
STscObj *pTscObj = pSql->pTscObj;
#ifdef CLUSTER
......@@ -163,10 +164,11 @@ void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
connInit.spi = 1;
connInit.encrypt = 0;
connInit.secret = pSql->pTscObj->pass;
#ifdef CLUSTER
connInit.peerIp = tscMgmtIpList.ipstr[pSql->index];
#else
connInit.peerIp = tsServerIpStr;
connInit.peerIp = tsServerIpStr;
#endif
thandle = taosOpenRpcConn(&connInit, pCode);
}
......@@ -222,7 +224,7 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
(pSql->index) = (pSql->index + 1) % TSDB_VNODES_SUPPORT;
continue;
}
*pCode = 0;
*pCode = TSDB_CODE_SUCCESS;
void *thandle =
taosGetConnFromCache(tscConnCache, pVPeersDesc[pSql->index].ip, pVPeersDesc[pSql->index].vnode, pTscObj->user);
......@@ -248,7 +250,7 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
pSql->thandle = thandle;
pSql->ip = pVPeersDesc[pSql->index].ip;
pSql->vnode = pVPeersDesc[pSql->index].vnode;
tscTrace("%p vnode:%d ip:0x%x index:%d is picked up, pConn:%p", pSql, pVPeersDesc[pSql->index].vnode,
tscTrace("%p vnode:%d ip:%p index:%d is picked up, pConn:%p", pSql, pVPeersDesc[pSql->index].vnode,
pVPeersDesc[pSql->index].ip, pSql->index, pSql->thandle);
#else
*pCode = 0;
......@@ -278,6 +280,15 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
break;
}
// the pSql->res.code is the previous error code.
if (pSql->thandle == NULL && pSql->retry >= pSql->maxRetry) {
if (pSql->res.code != TSDB_CODE_SUCCESS) {
*pCode = pSql->res.code;
}
tscError("%p reach the max retry:%d, code:%d", pSql, pSql->retry, *pCode);
}
}
int tscSendMsgToServer(SSqlObj *pSql) {
......@@ -313,11 +324,19 @@ int tscSendMsgToServer(SSqlObj *pSql) {
char *pStart = taosBuildReqHeader(pSql->thandle, pSql->cmd.msgType, buf);
if (pStart) {
/*
* this SQL object may be released by other thread due to the completion of this query even before the log
* is dumped to log file. So the signature needs to be kept in a local variable.
*/
uint64_t signature = (uint64_t) pSql->signature;
if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, buf);
int ret = taosSendMsgToPeerH(pSql->thandle, pStart, pSql->cmd.payloadLen, pSql);
if (ret >= 0) code = 0;
tscTrace("%p send msg ret:%d code:%d sig:%p", pSql, ret, code, pSql->signature);
if (ret >= 0) {
code = 0;
}
tscTrace("%p send msg ret:%d code:%d sig:%p", pSql, ret, code, signature);
}
}
......@@ -385,14 +404,11 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
// for single node situation, do NOT try next index
#endif
pSql->thandle = NULL;
// todo taos_stop_query() in async model
/*
* in case of
* 1. query cancelled(pRes->code != TSDB_CODE_QUERY_CANCELLED), do NOT re-issue the
* request to server.
* 2. retrieve, do NOT re-issue the retrieve request since the qhandle may
* have been released by server
* 1. query cancelled(pRes->code != TSDB_CODE_QUERY_CANCELLED), do NOT re-issue the request to server.
* 2. retrieve, do NOT re-issue the retrieve request since the qhandle may have been released by server
*/
if (pCmd->command != TSDB_SQL_FETCH && pCmd->command != TSDB_SQL_RETRIEVE && pCmd->command != TSDB_SQL_KILL_QUERY &&
pRes->code != TSDB_CODE_QUERY_CANCELLED) {
......@@ -418,8 +434,11 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
}
}
} else {
uint16_t rspCode = pMsg->content[0];
#ifdef CLUSTER
if (pMsg->content[0] == TSDB_CODE_REDIRECT) {
if (rspCode == TSDB_CODE_REDIRECT) {
tscTrace("%p it shall be redirected!", pSql);
taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
pSql->thandle = NULL;
......@@ -433,28 +452,23 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
code = tscSendMsgToServer(pSql);
if (code == 0) return pSql;
msg = NULL;
} else if (pMsg->content[0] == TSDB_CODE_NOT_ACTIVE_SESSION || pMsg->content[0] == TSDB_CODE_NETWORK_UNAVAIL ||
pMsg->content[0] == TSDB_CODE_INVALID_SESSION_ID) {
} else if (rspCode == TSDB_CODE_NOT_ACTIVE_TABLE || rspCode == TSDB_CODE_INVALID_TABLE_ID ||
rspCode == TSDB_CODE_INVALID_VNODE_ID || rspCode == TSDB_CODE_NOT_ACTIVE_VNODE ||
rspCode == TSDB_CODE_NETWORK_UNAVAIL) {
#else
if (pMsg->content[0] == TSDB_CODE_NOT_ACTIVE_SESSION || pMsg->content[0] == TSDB_CODE_NETWORK_UNAVAIL ||
pMsg->content[0] == TSDB_CODE_INVALID_SESSION_ID) {
if (rspCode == TSDB_CODE_NOT_ACTIVE_TABLE || rspCode == TSDB_CODE_INVALID_TABLE_ID ||
rspCode == TSDB_CODE_INVALID_VNODE_ID || rspCode == TSDB_CODE_NOT_ACTIVE_VNODE ||
rspCode == TSDB_CODE_NETWORK_UNAVAIL) {
#endif
pSql->thandle = NULL;
taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
if (pMeterMetaInfo != NULL && UTIL_METER_IS_METRIC(pMeterMetaInfo) &&
pMsg->content[0] == TSDB_CODE_NOT_ACTIVE_SESSION) {
/*
* for metric query, in case of any meter missing during query, sub-query of metric query will failed,
* causing metric query failed, and return TSDB_CODE_METRICMETA_EXPIRED code to app
*/
tscTrace("%p invalid meters id cause metric query failed, code:%d", pSql, pMsg->content[0]);
code = TSDB_CODE_METRICMETA_EXPIRED;
} else if ((pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) &&
pMsg->content[0] == TSDB_CODE_INVALID_SESSION_ID) {
if ((pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) &&
(rspCode == TSDB_CODE_INVALID_TABLE_ID || rspCode == TSDB_CODE_INVALID_VNODE_ID)) {
/*
* session id is invalid(e.g., less than 0 or larger than maximum session per
* vnode) in submit/query msg, no retry
* In case of the insert/select operations, the invalid table(vnode) id means
* the submit/query msg is invalid, renew meter meta will not help to fix this problem,
* so return the invalid_query_msg to client directly.
*/
code = TSDB_CODE_INVALID_QUERY_MSG;
} else if (pCmd->command == TSDB_SQL_CONNECT) {
......@@ -462,9 +476,11 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
} else if (pCmd->command == TSDB_SQL_HB) {
code = TSDB_CODE_NOT_READY;
} else {
tscTrace("%p it shall renew meter meta, code:%d", pSql, pMsg->content[0]);
tscTrace("%p it shall renew meter meta, code:%d", pSql, rspCode);
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
pSql->res.code = (uint8_t) rspCode; // keep the previous error code
code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return pSql;
......@@ -476,7 +492,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
msg = NULL;
} else { // for other error set and return to invoker
code = pMsg->content[0];
code = rspCode;
}
}
......@@ -723,9 +739,16 @@ int tscProcessSql(SSqlObj *pSql) {
#else
pSql->maxRetry = 2;
#endif
// the pMeterMetaInfo cannot be NULL
if (pMeterMetaInfo == NULL) {
pSql->res.code = TSDB_CODE_OTHERS;
return pSql->res.code;
}
if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
pSql->index = pMeterMetaInfo->pMeterMeta->index;
} else { // it must be the parent SSqlObj for metric query
} else { // it must be the parent SSqlObj for super table query
if ((pSql->cmd.type & TSDB_QUERY_TYPE_SUBQUERY) != 0) {
int32_t idx = pSql->cmd.vnodeIdx;
SVnodeSidList *pSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
......@@ -2460,10 +2483,10 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql) {
pMsg += sizeof(SMgmtHead);
*((uint64_t *)pMsg) = pSql->res.qhandle;
*((uint64_t *) pMsg) = pSql->res.qhandle;
pMsg += sizeof(pSql->res.qhandle);
*pMsg = htons(pCmd->type);
*((uint16_t*) pMsg) = htons(pCmd->type);
pMsg += sizeof(pCmd->type);
msgLen = pMsg - pStart;
......@@ -3451,11 +3474,12 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
pRes->row = 0;
/**
* If the query result is exhausted, the connection will be recycled.
* If current query is to free resource at server side, the connection will be recycle.
* If the query result is exhausted, or current query is to free resource at server side,
* the connection will be recycled.
*/
if ((pRes->numOfRows == 0 && !(tscProjectionQueryOnMetric(pCmd) && pRes->offset > 0)) ||
((pCmd->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE)) {
tscTrace("%p no result or free resource, recycle connection", pSql);
taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user);
pSql->thandle = NULL;
} else {
......@@ -3769,7 +3793,7 @@ void tscInitMsgs() {
tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromMgmt;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromVnode; // rsp handled by same function.
tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_TAGS] = tscProcessTagRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;
......
......@@ -647,11 +647,8 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
pCmd->limit.limit = pSql->cmd.globalLimit - pRes->numOfTotal;
pCmd->limit.offset = pRes->offset;
#ifdef CLUSTER
if ((++pSql->cmd.vnodeIdx) <= pMeterMetaInfo->pMetricMeta->numOfVnodes) {
#else
if ((++pSql->cmd.vnodeIdx) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
#endif
pSql->cmd.command = TSDB_SQL_SELECT;
assert(pSql->fp == NULL);
tscProcessSql(pSql);
......
......@@ -41,7 +41,7 @@ extern "C" {
#define TSDB_CODE_ACTION_NOT_ONLINE 18
#define TSDB_CODE_ACTION_SEND_FAILD 19
#define TSDB_CODE_NOT_ACTIVE_SESSION 20
#define TSDB_CODE_INSERT_FAILED 21
#define TSDB_CODE_INVALID_VNODE_ID 21
#define TSDB_CODE_APP_ERROR 22
#define TSDB_CODE_INVALID_IE 23
#define TSDB_CODE_INVALID_VALUE 24
......@@ -74,7 +74,7 @@ extern "C" {
#define TSDB_CODE_OTHERS 51
#define TSDB_CODE_NO_REMOVE_MASTER 52
#define TSDB_CODE_WRONG_SCHEMA 53
#define TSDB_CODE_NO_RESULT 54
#define TSDB_CODE_NOT_ACTIVE_VNODE 54
#define TSDB_CODE_TOO_MANY_USERS 55
#define TSDB_CODE_TOO_MANY_DATABSES 56
#define TSDB_CODE_TOO_MANY_TABLES 57
......@@ -134,6 +134,8 @@ extern "C" {
#define TSDB_CODE_NOT_SUPER_TABLE 111 //
#define TSDB_CODE_DUPLICATE_TAGS 112 // tags value for join not unique
#define TSDB_CODE_INVALID_SUBMIT_MSG 113
#define TSDB_CODE_NOT_ACTIVE_TABLE 114
#define TSDB_CODE_INVALID_TABLE_ID 115
// message type
#define TSDB_MSG_TYPE_REG 1
......@@ -673,7 +675,7 @@ typedef struct {
typedef struct {
uint64_t qhandle;
int16_t free;
uint16_t free;
} SRetrieveMeterMsg;
typedef struct {
......
......@@ -846,7 +846,7 @@ void shellGetGrantInfo(void *con) {
TAOS_FIELD *fields = taos_fetch_fields(result);
TAOS_ROW row = taos_fetch_row(result);
if (row == NULL) {
fprintf(stderr, "\nGrant information is empty.\n");
fprintf(stderr, "\nFailed to get grant information from server. Abort.\n");
exit(0);
}
......
......@@ -25,8 +25,10 @@ extern "C" {
#include <arpa/inet.h>
#include <assert.h>
#include <ctype.h>
#include <dirent.h>
#include <endian.h>
#include <errno.h>
#include <float.h>
#include <ifaddrs.h>
#include <limits.h>
......@@ -61,6 +63,7 @@ extern "C" {
#include <unistd.h>
#include <wchar.h>
#include <wordexp.h>
#include <wctype.h>
#define taosCloseSocket(x) \
{ \
......
......@@ -145,7 +145,7 @@ char *tsError[] = {"success",
"not online",
"send failed",
"not active session", // 20
"insert failed",
"invalid vnode id",
"App error",
"invalid IE",
"invalid value",
......@@ -178,7 +178,7 @@ char *tsError[] = {"success",
"others",
"can't remove dnode which is master",
"wrong schema",
"no results",
"vnode not active(not created yet or dropped already)",
"num of users execeed maxUsers", //55
"num of databases execeed maxDbs",
"num of tables execeed maxTables",
......@@ -233,9 +233,11 @@ char *tsError[] = {"success",
"invalid query message",
"timestamp disordered in cache block",
"timestamp disordered in file block",
"invalid commit log", //110
"server no disk space",
"invalid commit log",
"server no disk space", //110
"only super table has metric meta info",
"tags value not unique for join",
"invalid submit message",
"not active table(not created yet or deleted already)", //114
"invalid table id",
};
......@@ -105,14 +105,14 @@ int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pObj) {
if (vid >= TSDB_MAX_VNODES || vid < 0) {
dError("vid:%d, vnode is out of range", vid);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_VNODE_ID;
goto _over;
}
pVnode = vnodeList + vid;
if (pVnode->cfg.maxSessions <= 0) {
dError("vid:%d, not activated", vid);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _over;
}
......@@ -141,27 +141,27 @@ int vnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj) {
if (vid >= TSDB_MAX_VNODES || vid < 0) {
dError("vid:%d, vnode is out of range", vid);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_VNODE_ID;
goto _over;
}
pVnode = vnodeList + vid;
if (pVnode->cfg.maxSessions <= 0 || pVnode->pCachePool == NULL) {
dError("vid:%d is not activated yet", pAlter->vnode);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _over;
}
if (pAlter->sid >= pVnode->cfg.maxSessions || pAlter->sid < 0) {
dError("vid:%d sid:%d uid:%ld, sid is out of range", pAlter->vnode, pAlter->sid, pAlter->uid);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_TABLE_ID;
goto _over;
}
SMeterObj *pMeterObj = vnodeList[vid].meterList[sid];
if (pMeterObj == NULL || sid != pMeterObj->sid || vid != pMeterObj->vnode) {
dError("vid:%d sid:%d, no active session", vid, sid);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
dError("vid:%d sid:%d, no active table", vid, sid);
code = TSDB_CODE_NOT_ACTIVE_TABLE;
goto _over;
}
......@@ -195,7 +195,7 @@ int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen) {
if (pCreate->vnode >= TSDB_MAX_VNODES || pCreate->vnode < 0) {
dError("vid:%d is out of range", pCreate->vnode);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_VNODE_ID;
goto _create_over;
}
......@@ -203,13 +203,13 @@ int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen) {
if (pVnode->pCachePool == NULL) {
dError("vid:%d is not activated yet", pCreate->vnode);
vnodeSendVpeerCfgMsg(pCreate->vnode);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _create_over;
}
if (pCreate->sid >= pVnode->cfg.maxSessions || pCreate->sid < 0) {
dError("vid:%d sid:%d id:%s, sid is out of range", pCreate->vnode, pCreate->sid, pCreate->meterId);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_TABLE_ID;
goto _create_over;
}
......
......@@ -873,7 +873,7 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED);
return -1;
} else {
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) == 0) {
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
rowsToRead = pShow->numOfRows - pShow->numOfReads;
}
......@@ -905,7 +905,7 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) {
pMsg = pRsp->data;
// if free flag is set, client wants to clean the resources
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) == 0)
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE)
rowsRead = (*mgmtRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, pConn);
if (rowsRead < 0) {
......
......@@ -363,19 +363,19 @@ int vnodeImportToFile(SImportInfo *pImport) {
SData *cdata[TSDB_MAX_COLUMNS];
char *buffer1 =
malloc(pObj->bytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES) * pObj->numOfColumns);
malloc(pObj->bytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM)) * pObj->numOfColumns);
cdata[0] = (SData *)buffer1;
SData *data[TSDB_MAX_COLUMNS];
char *buffer2 =
malloc(pObj->bytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES) * pObj->numOfColumns);
malloc(pObj->bytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM)) * pObj->numOfColumns);
data[0] = (SData *)buffer2;
for (col = 1; col < pObj->numOfColumns; ++col) {
cdata[col] = (SData *)(((char *)cdata[col - 1]) + sizeof(SData) + EXTRA_BYTES +
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes + sizeof(TSCKSUM));
data[col] = (SData *)(((char *)data[col - 1]) + sizeof(SData) + EXTRA_BYTES +
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes + sizeof(TSCKSUM));
}
int rowsBefore = 0;
......
......@@ -618,7 +618,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
dWarn("vid:%d sid:%d id:%s, meter is dropped, abort insert, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->state);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_TABLE;
break;
}
......
......@@ -1840,7 +1840,7 @@ static void setCtxTagColumnInfo(SQuery* pQuery, SQueryRuntimeEnv* pRuntimeEnv) {
// ts may be the required primary timestamp column
continue;
} else {
assert(0);
// the column may be the normal column, group by normal_column, the functionId is TSDB_FUNC_PRJ
}
}
......
......@@ -269,7 +269,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pQueryMsg->vnode >= TSDB_MAX_VNODES || pQueryMsg->vnode < 0) {
dTrace("qmsg:%p,vid:%d is out of range", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_TABLE_ID;
goto _query_over;
}
......@@ -278,7 +278,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pVnode->cfg.maxSessions == 0) {
dError("qmsg:%p,vid:%d is not activated yet", pQueryMsg, pQueryMsg->vnode);
vnodeSendVpeerCfgMsg(pQueryMsg->vnode);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_TABLE;
goto _query_over;
}
......@@ -295,7 +295,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pVnode->meterList == NULL) {
dError("qmsg:%p,vid:%d has been closed", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _query_over;
}
......@@ -305,7 +305,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
dTrace("qmsg:%p sid:%d is out of range, valid range:[%d,%d]", pQueryMsg, pSids[i]->sid, 0,
pVnode->cfg.maxSessions);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_TABLE_ID;
goto _query_over;
}
}
......@@ -488,7 +488,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pSubmit->vnode >= TSDB_MAX_VNODES || pSubmit->vnode < 0) {
dTrace("vnode:%d is out of range", pSubmit->vnode);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_VNODE_ID;
goto _submit_over;
}
......@@ -496,7 +496,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pVnode->cfg.maxSessions == 0 || pVnode->meterList == NULL) {
dError("vid:%d is not activated for submit", pSubmit->vnode);
vnodeSendVpeerCfgMsg(pSubmit->vnode);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _submit_over;
}
......@@ -529,7 +529,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (pBlocks->sid >= pVnode->cfg.maxSessions || pBlocks->sid <= 0) {
dTrace("sid:%d is out of range", pBlocks->sid);
code = TSDB_CODE_INVALID_SESSION_ID;
code = TSDB_CODE_INVALID_TABLE_ID;
goto _submit_over;
}
......@@ -538,9 +538,9 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
SMeterObj *pMeterObj = vnodeList[vnode].meterList[sid];
if (pMeterObj == NULL) {
dError("vid:%d sid:%d, no active session", vnode, sid);
dError("vid:%d sid:%d, no active table", vnode, sid);
vnodeSendMeterCfgMsg(vnode, sid);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_TABLE;
goto _submit_over;
}
......@@ -579,7 +579,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) {
dTrace("vid:%d sid:%d id:%s, it is removed, state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
pMeterObj->state);
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_TABLE;
break;
} else {// waiting for 300ms by default and try again
dTrace("vid:%d sid:%d id:%s, try submit again since in state:%d", pMeterObj->vnode, pMeterObj->sid,
......
......@@ -553,7 +553,7 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid
if (pMeter == NULL || (pMeter->state > TSDB_METER_STATE_INSERT)) {
if (pMeter == NULL || vnodeIsMeterState(pMeter, TSDB_METER_STATE_DELETING)) {
code = TSDB_CODE_NOT_ACTIVE_SESSION;
code = TSDB_CODE_NOT_ACTIVE_TABLE;
dError("qmsg:%p, vid:%d sid:%d, not there or will be dropped", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid);
vnodeSendMeterCfgMsg(pQueryMsg->vnode, pSids[i]->sid);
} else {//update or import
......
......@@ -118,7 +118,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.10</version>
<version>2.9.10.1</version>
</dependency>
<dependency>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册