Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
bdc235b4
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
bdc235b4
编写于
5月 13, 2020
作者:
H
hjLiao
浏览文件
操作
浏览文件
下载
差异文件
[td-225] merge develop
上级
cfcbefba
09cbf619
变更
25
隐藏空白更改
内联
并排
Showing
25 changed file
with
558 addition
and
416 deletion
+558
-416
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+7
-3
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+3
-0
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+7
-7
src/client/src/tscLocal.c
src/client/src/tscLocal.c
+2
-2
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+2
-2
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+18
-18
src/client/src/tscServer.c
src/client/src/tscServer.c
+15
-13
src/client/src/tscStream.c
src/client/src/tscStream.c
+1
-1
src/client/src/tscSub.c
src/client/src/tscSub.c
+171
-155
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+39
-65
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+45
-12
src/inc/tsdb.h
src/inc/tsdb.h
+4
-2
src/query/inc/queryExecutor.h
src/query/inc/queryExecutor.h
+1
-0
src/query/src/queryExecutor.c
src/query/src/queryExecutor.c
+133
-84
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+6
-3
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+1
-1
src/util/inc/exception.h
src/util/inc/exception.h
+6
-3
src/util/inc/tarray.h
src/util/inc/tarray.h
+6
-0
src/util/src/exception.c
src/util/src/exception.c
+21
-5
src/util/src/tarray.c
src/util/src/tarray.c
+5
-0
src/util/src/tcache.c
src/util/src/tcache.c
+2
-0
src/vnode/inc/vnodeInt.h
src/vnode/inc/vnodeInt.h
+1
-0
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+17
-10
tests/examples/c/subscribe.c
tests/examples/c/subscribe.c
+44
-29
tests/script/general/cache/new_metrics.sim
tests/script/general/cache/new_metrics.sim
+1
-1
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
bdc235b4
...
...
@@ -31,13 +31,13 @@ extern "C" {
#include "tscSecondaryMerge.h"
#include "tsclient.h"
#define UTIL_TABLE_IS_SUPERTABLE(metaInfo) \
#define UTIL_TABLE_IS_SUPER
_
TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
#define UTIL_TABLE_IS_CHILD_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
#define UTIL_TABLE_IS_NO
MR
AL_TABLE(metaInfo)\
(!(UTIL_TABLE_IS_SUPERTABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
#define UTIL_TABLE_IS_NO
RM
AL_TABLE(metaInfo)\
(!(UTIL_TABLE_IS_SUPER
_
TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0)
...
...
@@ -265,6 +265,10 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void
tscAsyncQuerySingleRowForNextVnode
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
);
void
tscTryQueryNextClause
(
SSqlObj
*
pSql
,
void
(
*
queryFp
)());
void
*
malloc_throw
(
size_t
size
);
void
*
calloc_throw
(
size_t
nmemb
,
size_t
size
);
char
*
strdup_throw
(
const
char
*
str
);
#ifdef __cplusplus
}
#endif
...
...
src/client/inc/tsclient.h
浏览文件 @
bdc235b4
...
...
@@ -438,6 +438,9 @@ extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
typedef
void
(
*
__async_cb_func_t
)(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
);
int32_t
tscCompareTidTags
(
const
void
*
p1
,
const
void
*
p2
);
void
tscBuildVgroupTableInfo
(
STableMetaInfo
*
pTableMetaInfo
,
SArray
*
tables
);
#ifdef __cplusplus
}
#endif
...
...
src/client/src/tscAsync.c
浏览文件 @
bdc235b4
...
...
@@ -446,6 +446,11 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if
((
pQueryInfo
->
type
&
TSDB_QUERY_TYPE_STABLE_SUBQUERY
)
==
TSDB_QUERY_TYPE_STABLE_SUBQUERY
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
if
(
pTableMetaInfo
->
pTableMeta
==
NULL
){
code
=
tscGetTableMeta
(
pSql
,
pTableMetaInfo
);
assert
(
code
==
TSDB_CODE_SUCCESS
);
}
assert
((
tscGetNumOfTags
(
pTableMetaInfo
->
pTableMeta
)
!=
0
)
&&
pTableMetaInfo
->
vgroupIndex
>=
0
&&
pSql
->
param
!=
NULL
);
SRetrieveSupport
*
trs
=
(
SRetrieveSupport
*
)
pSql
->
param
;
...
...
@@ -454,12 +459,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
assert
(
pParObj
->
signature
==
pParObj
&&
trs
->
subqueryIndex
==
pTableMetaInfo
->
vgroupIndex
&&
tscGetNumOfTags
(
pTableMetaInfo
->
pTableMeta
)
!=
0
);
tscTrace
(
"%p get metricMeta during super table query successfully"
,
pSql
);
code
=
tscGetTableMeta
(
pSql
,
pTableMetaInfo
);
pRes
->
code
=
code
;
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
return
;
tscTrace
(
"%p get metricMeta during super table query successfully"
,
pSql
);
code
=
tscGetSTableVgroupInfo
(
pSql
,
0
);
pRes
->
code
=
code
;
...
...
@@ -492,7 +492,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
return
;
if
(
code
==
TSDB_CODE_SUCCESS
&&
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
code
==
TSDB_CODE_SUCCESS
&&
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
code
=
tscGetSTableVgroupInfo
(
pSql
,
pCmd
->
clauseIndex
);
pRes
->
code
=
code
;
...
...
src/client/src/tscLocal.c
浏览文件 @
bdc235b4
...
...
@@ -122,7 +122,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
int32_t
numOfRows
=
tscGetNumOfColumns
(
pMeta
);
int32_t
totalNumOfRows
=
numOfRows
+
tscGetNumOfTags
(
pMeta
);
if
(
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
numOfRows
=
numOfRows
+
tscGetNumOfTags
(
pMeta
);
}
...
...
@@ -161,7 +161,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
}
}
if
(
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
return
0
;
}
...
...
src/client/src/tscParseInsert.c
浏览文件 @
bdc235b4
...
...
@@ -796,7 +796,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return
code
;
}
if
(
!
UTIL_TABLE_IS_SUPERTABLE
(
pSTableMeterMetaInfo
))
{
if
(
!
UTIL_TABLE_IS_SUPER
_
TABLE
(
pSTableMeterMetaInfo
))
{
return
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"create table only from super table is allowed"
,
sToken
.
z
);
}
...
...
@@ -1097,7 +1097,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto
_error_clean
;
// TODO: should _clean or _error_clean to async flow ????
}
if
(
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
code
=
tscInvalidSQLErrMsg
(
pCmd
->
payload
,
"insert data into super table is not supported"
,
NULL
);
goto
_error_clean
;
}
...
...
src/client/src/tscSQLParser.c
浏览文件 @
bdc235b4
...
...
@@ -1348,7 +1348,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMeta
);
if
(
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
numOfTotalColumns
=
tinfo
.
numOfColumns
+
tinfo
.
numOfTags
;
}
else
{
numOfTotalColumns
=
tinfo
.
numOfColumns
;
...
...
@@ -1408,7 +1408,7 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
index
.
tableIndex
);
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
if
(
index
.
columnIndex
>=
tscGetNumOfColumns
(
pTableMeta
)
&&
UTIL_TABLE_IS_NO
MR
AL_TABLE
(
pTableMetaInfo
))
{
if
(
index
.
columnIndex
>=
tscGetNumOfColumns
(
pTableMeta
)
&&
UTIL_TABLE_IS_NO
RM
AL_TABLE
(
pTableMetaInfo
))
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg1
);
}
...
...
@@ -1862,7 +1862,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
case
TK_TBID
:
{
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
if
(
UTIL_TABLE_IS_NO
MR
AL_TABLE
(
pTableMetaInfo
))
{
if
(
UTIL_TABLE_IS_NO
RM
AL_TABLE
(
pTableMetaInfo
))
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg7
);
}
...
...
@@ -2279,7 +2279,7 @@ bool validateIpAddress(const char* ip, size_t size) {
int32_t
tscTansformSQLFuncForSTableQuery
(
SQueryInfo
*
pQueryInfo
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
if
(
pTableMetaInfo
->
pTableMeta
==
NULL
||
!
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
pTableMetaInfo
->
pTableMeta
==
NULL
||
!
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
return
TSDB_CODE_INVALID_SQL
;
}
...
...
@@ -2318,7 +2318,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) {
/* transfer the field-info back to original input format */
void
tscRestoreSQLFuncForSTableQuery
(
SQueryInfo
*
pQueryInfo
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
if
(
!
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
!
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
return
;
}
...
...
@@ -2542,7 +2542,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
}
if
(
groupTag
)
{
if
(
!
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
!
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg9
);
}
...
...
@@ -3254,7 +3254,7 @@ static bool validateJoinExprNode(SQueryInfo* pQueryInfo, tSQLExpr* pExpr, SColum
}
// table to table/ super table to super table are allowed
if
(
UTIL_TABLE_IS_SUPER
TABLE
(
pLeftMeterMeta
)
!=
UTIL_TABLE_IS_SUPER
TABLE
(
pRightMeterMeta
))
{
if
(
UTIL_TABLE_IS_SUPER
_TABLE
(
pLeftMeterMeta
)
!=
UTIL_TABLE_IS_SUPER_
TABLE
(
pRightMeterMeta
))
{
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg5
);
return
false
;
}
...
...
@@ -3337,7 +3337,7 @@ static int32_t handleExprInQueryCond(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, S
}
else
if
(
index
.
columnIndex
>=
tscGetNumOfColumns
(
pTableMeta
)
||
index
.
columnIndex
==
TSDB_TBNAME_COLUMN_INDEX
)
{
// query on tags
// check for tag query condition
if
(
UTIL_TABLE_IS_NO
MR
AL_TABLE
(
pTableMetaInfo
))
{
if
(
UTIL_TABLE_IS_NO
RM
AL_TABLE
(
pTableMetaInfo
))
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg1
);
}
...
...
@@ -3697,7 +3697,7 @@ static int32_t validateJoinExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) {
}
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
if
(
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
// for stable join, tag columns
if
(
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
// for stable join, tag columns
// must be present for join
if
(
pCondExpr
->
pJoinExpr
==
NULL
)
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg1
);
...
...
@@ -3735,7 +3735,7 @@ static void cleanQueryExpr(SCondExpr* pCondExpr) {
static
void
doAddJoinTagsColumnsIntoTagList
(
SQueryInfo
*
pQueryInfo
,
SCondExpr
*
pCondExpr
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
if
(
QUERY_IS_JOIN_QUERY
(
pQueryInfo
->
type
)
&&
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
QUERY_IS_JOIN_QUERY
(
pQueryInfo
->
type
)
&&
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
SColumnIndex
index
=
{
0
};
getColumnIndexByName
(
&
pCondExpr
->
pJoinExpr
->
pLeft
->
colInfo
,
pQueryInfo
,
&
index
);
...
...
@@ -4102,7 +4102,7 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
}
/* for super table query, set default ascending order for group output */
if
(
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
pQueryInfo
->
groupbyExpr
.
orderType
=
TSDB_ORDER_ASC
;
}
}
...
...
@@ -4128,7 +4128,7 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema
*
* for super table query, the order option must be less than 3.
*/
if
(
UTIL_TABLE_IS_NO
MR
AL_TABLE
(
pTableMetaInfo
))
{
if
(
UTIL_TABLE_IS_NO
RM
AL_TABLE
(
pTableMetaInfo
))
{
if
(
pSortorder
->
nExpr
>
1
)
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg0
);
}
...
...
@@ -4149,7 +4149,7 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema
SSQLToken
columnName
=
{
pVar
->
nLen
,
pVar
->
nType
,
pVar
->
pz
};
SColumnIndex
index
=
{
0
};
if
(
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
// super table query
if
(
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
// super table query
if
(
getColumnIndexByName
(
&
columnName
,
pQueryInfo
,
&
index
)
!=
TSDB_CODE_SUCCESS
)
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg1
);
}
...
...
@@ -4302,10 +4302,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if
(
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_ADD_TAG_COLUMN
||
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_DROP_TAG_COLUMN
||
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN
)
{
if
(
UTIL_TABLE_IS_NO
MR
AL_TABLE
(
pTableMetaInfo
))
{
if
(
UTIL_TABLE_IS_NO
RM
AL_TABLE
(
pTableMetaInfo
))
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg3
);
}
}
else
if
((
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_UPDATE_TAG_VAL
)
&&
(
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
)))
{
}
else
if
((
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_UPDATE_TAG_VAL
)
&&
(
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
)))
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg4
);
}
else
if
((
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_ADD_COLUMN
||
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_DROP_COLUMN
)
&&
UTIL_TABLE_IS_CHILD_TABLE
(
pTableMetaInfo
))
{
...
...
@@ -4691,7 +4691,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
}
// todo refactor
if
(
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
if
(
!
tscQueryTags
(
pQueryInfo
))
{
// local handle the super table tag query
if
(
tscIsProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
if
(
pQueryInfo
->
slimit
.
limit
>
0
||
pQueryInfo
->
slimit
.
offset
>
0
)
{
...
...
@@ -5627,7 +5627,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return
code
;
}
bool
isSTable
=
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
);
bool
isSTable
=
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
);
if
(
parseSelectClause
(
&
pSql
->
cmd
,
0
,
pQuerySql
->
pSelection
,
isSTable
)
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_INVALID_SQL
;
}
...
...
@@ -5771,7 +5771,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
assert
(
pQueryInfo
->
numOfTables
==
pQuerySql
->
from
->
nExpr
);
bool
isSTable
=
false
;
if
(
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
isSTable
=
true
;
code
=
tscGetSTableVgroupInfo
(
pSql
,
index
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
src/client/src/tscServer.c
浏览文件 @
bdc235b4
...
...
@@ -39,7 +39,7 @@ int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void
tscProcessActivityTimer
(
void
*
handle
,
void
*
tmrId
);
int
tscKeepConn
[
TSDB_SQL_MAX
]
=
{
0
};
TSKEY
tscGetSubscriptionProgress
(
void
*
sub
,
int64_t
uid
);
TSKEY
tscGetSubscriptionProgress
(
void
*
sub
,
int64_t
uid
,
TSKEY
dflt
);
void
tscUpdateSubscriptionProgress
(
void
*
sub
,
int64_t
uid
,
TSKEY
ts
);
void
tscSaveSubscriptionProgress
(
void
*
sub
);
...
...
@@ -500,7 +500,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// todo valid the vgroupId at the client side
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
if
(
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
int32_t
vgIndex
=
pTableMetaInfo
->
vgroupIndex
;
SVgroupsInfo
*
pVgroupInfo
=
pTableMetaInfo
->
vgroupList
;
...
...
@@ -566,12 +566,13 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
static
char
*
doSerializeTableInfo
(
SQueryTableMsg
*
pQueryMsg
,
SSqlObj
*
pSql
,
char
*
pMsg
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
,
0
);
TSKEY
dfltKey
=
htobe64
(
pQueryMsg
->
window
.
skey
);
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
if
(
UTIL_TABLE_IS_NO
MR
AL_TABLE
(
pTableMetaInfo
)
||
pTableMetaInfo
->
pVgroupTables
==
NULL
)
{
if
(
UTIL_TABLE_IS_NO
RM
AL_TABLE
(
pTableMetaInfo
)
||
pTableMetaInfo
->
pVgroupTables
==
NULL
)
{
SCMVgroupInfo
*
pVgroupInfo
=
NULL
;
if
(
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
int32_t
index
=
pTableMetaInfo
->
vgroupIndex
;
assert
(
index
>=
0
);
...
...
@@ -580,25 +581,25 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
}
else
{
pVgroupInfo
=
&
pTableMeta
->
vgroupInfo
;
}
tscSetDnodeIpList
(
pSql
,
pVgroupInfo
);
pQueryMsg
->
head
.
vgId
=
htonl
(
pVgroupInfo
->
vgId
);
STableIdInfo
*
pTableIdInfo
=
(
STableIdInfo
*
)
pMsg
;
pTableIdInfo
->
tid
=
htonl
(
pTableMeta
->
sid
);
pTableIdInfo
->
uid
=
htobe64
(
pTableMeta
->
uid
);
pTableIdInfo
->
key
=
htobe64
(
tscGetSubscriptionProgress
(
pSql
->
pSubscription
,
pTableMeta
->
uid
));
pTableIdInfo
->
key
=
htobe64
(
tscGetSubscriptionProgress
(
pSql
->
pSubscription
,
pTableMeta
->
uid
,
dfltKey
));
pQueryMsg
->
numOfTables
=
htonl
(
1
);
// set the number of tables
pMsg
+=
sizeof
(
STableIdInfo
);
}
else
{
int32_t
index
=
pTableMetaInfo
->
vgroupIndex
;
int32_t
numOfVgroups
=
taosArrayGetSize
(
pTableMetaInfo
->
pVgroupTables
);
assert
(
index
>=
0
&&
index
<
numOfVgroups
);
tscTrace
(
"%p query on stable, vgIndex:%d, numOfVgroups:%d"
,
pSql
,
index
,
numOfVgroups
);
SVgroupTableInfo
*
pTableIdList
=
taosArrayGet
(
pTableMetaInfo
->
pVgroupTables
,
index
);
// set the vgroup info
...
...
@@ -615,7 +616,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
STableIdInfo
*
pTableIdInfo
=
(
STableIdInfo
*
)
pMsg
;
pTableIdInfo
->
tid
=
htonl
(
pItem
->
tid
);
pTableIdInfo
->
uid
=
htobe64
(
pItem
->
uid
);
// pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid
));
pTableIdInfo
->
key
=
htobe64
(
tscGetSubscriptionProgress
(
pSql
->
pSubscription
,
pItem
->
uid
,
dfltKey
));
pMsg
+=
sizeof
(
STableIdInfo
);
}
}
...
...
@@ -2284,7 +2285,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
taosCacheRelease
(
tscCacheHandle
,
(
void
**
)
&
pTableMeta
,
true
);
if
(
pTableMetaInfo
->
pTableMeta
)
{
bool
isSuperTable
=
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
);
bool
isSuperTable
=
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
);
taosCacheRelease
(
tscCacheHandle
,
(
void
**
)
&
(
pTableMetaInfo
->
pTableMeta
),
true
);
// taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true);
...
...
@@ -2346,6 +2347,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
for
(
int
i
=
0
;
i
<
numOfTables
;
i
++
)
{
int64_t
uid
=
htobe64
(
*
(
int64_t
*
)
p
);
p
+=
sizeof
(
int64_t
);
p
+=
sizeof
(
int32_t
);
// skip tid
TSKEY
key
=
htobe64
(
*
(
TSKEY
*
)
p
);
p
+=
sizeof
(
TSKEY
);
tscUpdateSubscriptionProgress
(
pSql
->
pSubscription
,
uid
,
key
);
...
...
src/client/src/tscStream.c
浏览文件 @
bdc235b4
...
...
@@ -79,7 +79,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
return
;
if
(
code
==
0
&&
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
code
==
0
&&
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
code
=
tscGetSTableVgroupInfo
(
pSql
,
0
);
pSql
->
res
.
code
=
code
;
...
...
src/client/src/tscSub.c
浏览文件 @
bdc235b4
...
...
@@ -44,8 +44,7 @@ typedef struct SSub {
int
interval
;
TAOS_SUBSCRIBE_CALLBACK
fp
;
void
*
param
;
int
numOfTables
;
SSubscriptionProgress
*
progress
;
SArray
*
progress
;
}
SSub
;
...
...
@@ -57,92 +56,113 @@ static int tscCompareSubscriptionProgress(const void* a, const void* b) {
return
0
;
}
TSKEY
tscGetSubscriptionProgress
(
void
*
sub
,
int64_t
uid
)
{
if
(
sub
==
NULL
)
return
0
;
SSub
*
pSub
=
(
SSub
*
)
sub
;
for
(
int
s
=
0
,
e
=
pSub
->
numOfTables
;
s
<
e
;)
{
int
m
=
(
s
+
e
)
/
2
;
SSubscriptionProgress
*
p
=
pSub
->
progress
+
m
;
if
(
p
->
uid
>
uid
)
e
=
m
;
else
if
(
p
->
uid
<
uid
)
s
=
m
+
1
;
else
return
p
->
key
;
TSKEY
tscGetSubscriptionProgress
(
void
*
sub
,
int64_t
uid
,
TSKEY
dflt
)
{
if
(
sub
==
NULL
)
{
return
dflt
;
}
SSub
*
pSub
=
(
SSub
*
)
sub
;
return
0
;
SSubscriptionProgress
target
=
{.
uid
=
uid
,
.
key
=
0
};
SSubscriptionProgress
*
p
=
taosArraySearch
(
pSub
->
progress
,
tscCompareSubscriptionProgress
,
&
target
);
if
(
p
==
NULL
)
{
return
dflt
;
}
return
p
->
key
;
}
void
tscUpdateSubscriptionProgress
(
void
*
sub
,
int64_t
uid
,
TSKEY
ts
)
{
if
(
sub
==
NULL
)
return
;
SSub
*
pSub
=
(
SSub
*
)
sub
;
for
(
int
s
=
0
,
e
=
pSub
->
numOfTables
;
s
<
e
;)
{
int
m
=
(
s
+
e
)
/
2
;
SSubscriptionProgress
*
p
=
pSub
->
progress
+
m
;
if
(
p
->
uid
>
uid
)
e
=
m
;
else
if
(
p
->
uid
<
uid
)
s
=
m
+
1
;
else
{
if
(
ts
>=
p
->
key
)
p
->
key
=
ts
;
break
;
}
SSubscriptionProgress
target
=
{.
uid
=
uid
,
.
key
=
ts
};
SSubscriptionProgress
*
p
=
taosArraySearch
(
pSub
->
progress
,
tscCompareSubscriptionProgress
,
&
target
);
if
(
p
!=
NULL
)
{
p
->
key
=
ts
;
}
}
static
void
asyncCallback
(
void
*
param
,
TAOS_RES
*
tres
,
int
code
)
{
assert
(
param
!=
NULL
);
SSqlObj
*
pSql
=
((
SSqlObj
*
)
param
);
pSql
->
res
.
code
=
code
;
sem_post
(
&
pSql
->
rspSem
);
}
static
SSub
*
tscCreateSubscription
(
STscObj
*
pObj
,
const
char
*
topic
,
const
char
*
sql
)
{
SSub
*
pSub
=
calloc
(
1
,
sizeof
(
SSub
));
if
(
pSub
==
NULL
)
{
terrno
=
TSDB_CODE_CLI_OUT_OF_MEMORY
;
tscError
(
"failed to allocate memory for subscription"
);
return
NULL
;
}
SSub
*
pSub
=
NULL
;
SSqlObj
*
pSql
=
calloc
(
1
,
sizeof
(
SSqlObj
));
if
(
pSql
==
NULL
)
{
terrno
=
TSDB_CODE_CLI_OUT_OF_MEMORY
;
tscError
(
"failed to allocate SSqlObj for subscription"
);
goto
_pSql_failed
;
}
TRY
(
8
)
{
SSqlObj
*
pSql
=
calloc_throw
(
1
,
sizeof
(
SSqlObj
));
CLEANUP_PUSH_FREE
(
true
,
pSql
);
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
if
(
tsem_init
(
&
pSql
->
rspSem
,
0
,
0
)
==
-
1
)
{
THROW
(
TAOS_SYSTEM_ERROR
(
errno
));
}
CLEANUP_PUSH_INT_PTR
(
true
,
tsem_destroy
,
&
pSql
->
rspSem
);
pSql
->
signature
=
pSql
;
pSql
->
pTscObj
=
pObj
;
pSql
->
signature
=
pSql
;
pSql
->
param
=
pSql
;
pSql
->
pTscObj
=
pObj
;
pSql
->
maxRetry
=
TSDB_MAX_REPLICA_NUM
;
pSql
->
fp
=
asyncCallback
;
char
*
sqlstr
=
(
char
*
)
malloc
(
strlen
(
sql
)
+
1
);
if
(
sqlstr
==
NULL
)
{
tscError
(
"failed to allocate sql string for subscription"
);
goto
failed
;
}
strcpy
(
sqlstr
,
sql
);
strtolower
(
sqlstr
,
sqlstr
);
pSql
->
sqlstr
=
sqlstr
;
int
code
=
tscAllocPayload
(
pCmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
THROW
(
code
);
}
CLEANUP_PUSH_FREE
(
true
,
pCmd
->
payload
);
tsem_init
(
&
pSql
->
rspSem
,
0
,
0
);
pRes
->
qhandle
=
0
;
pRes
->
numOfRows
=
1
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
pRes
->
numOfRows
=
1
;
pRes
->
numOfTotal
=
0
;
pSql
->
pSubscription
=
pSub
;
pSub
->
pSql
=
pSql
;
pSub
->
signature
=
pSub
;
strncpy
(
pSub
->
topic
,
topic
,
sizeof
(
pSub
->
topic
));
pSub
->
topic
[
sizeof
(
pSub
->
topic
)
-
1
]
=
0
;
return
pSub
;
pSql
->
sqlstr
=
strdup_throw
(
sql
);
CLEANUP_PUSH_FREE
(
true
,
pSql
->
sqlstr
);
strtolower
(
pSql
->
sqlstr
,
pSql
->
sqlstr
);
code
=
tsParseSql
(
pSql
,
false
);
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
// wait for the callback function to post the semaphore
sem_wait
(
&
pSql
->
rspSem
);
code
=
pSql
->
res
.
code
;
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"failed to parse sql statement: %s, error: %s"
,
pSub
->
topic
,
tstrerror
(
code
));
THROW
(
code
);
}
if
(
pSql
->
cmd
.
command
!=
TSDB_SQL_SELECT
)
{
tscError
(
"only 'select' statement is allowed in subscription: %s"
,
pSub
->
topic
);
THROW
(
-
1
);
// TODO
}
pSub
=
calloc_throw
(
1
,
sizeof
(
SSub
));
CLEANUP_PUSH_FREE
(
true
,
pSub
);
pSql
->
pSubscription
=
pSub
;
pSub
->
pSql
=
pSql
;
pSub
->
signature
=
pSub
;
strncpy
(
pSub
->
topic
,
topic
,
sizeof
(
pSub
->
topic
));
pSub
->
topic
[
sizeof
(
pSub
->
topic
)
-
1
]
=
0
;
pSub
->
progress
=
taosArrayInit
(
32
,
sizeof
(
SSubscriptionProgress
));
if
(
pSub
->
progress
==
NULL
)
{
THROW
(
TSDB_CODE_CLI_OUT_OF_MEMORY
);
}
CLEANUP_EXECUTE
();
failed:
tfree
(
sqlstr
);
}
CATCH
(
code
)
{
tscError
(
"failed to create subscription object: %s"
,
tstrerror
(
code
));
CLEANUP_EXECUTE
();
pSub
=
NULL
;
_pSql_failed:
tfree
(
pSql
);
tfree
(
pSub
);
return
NULL
;
}
END_TRY
return
pSub
;
}
...
...
@@ -159,60 +179,68 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) {
}
int
tscUpdateSubscription
(
STscObj
*
pObj
,
SSub
*
pSub
)
{
int
code
=
(
uint8_t
)
tsParseSql
(
pSub
->
pSql
,
false
);
static
SArray
*
getTableList
(
SSqlObj
*
pSql
)
{
const
char
*
p
=
strstr
(
pSql
->
sqlstr
,
" from "
);
char
*
sql
=
alloca
(
strlen
(
p
)
+
32
);
sprintf
(
sql
,
"select tbid(tbname)%s"
,
p
);
int
code
=
taos_query
(
pSql
->
pTscObj
,
sql
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"failed to
parse sql statement: %s"
,
pSub
->
topic
);
return
0
;
tscError
(
"failed to
retrieve table id: %s"
,
tstrerror
(
code
)
);
return
NULL
;
}
SSqlCmd
*
pCmd
=
&
pSub
->
pSql
->
cmd
;
if
(
pCmd
->
command
!=
TSDB_SQL_SELECT
)
{
tscError
(
"only 'select' statement is allowed in subscription: %s"
,
pSub
->
topic
);
return
0
;
TAOS_RES
*
res
=
taos_use_result
(
pSql
->
pTscObj
);
TAOS_ROW
row
;
SArray
*
result
=
taosArrayInit
(
128
,
sizeof
(
STidTags
)
);
while
((
row
=
taos_fetch_row
(
res
)))
{
STidTags
tags
;
memcpy
(
&
tags
,
row
[
0
],
sizeof
(
tags
));
taosArrayPush
(
result
,
&
tags
);
}
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
0
,
0
);
int
numOfTables
=
0
;
if
(
!
UTIL_TABLE_IS_NOMRAL_TABLE
(
pTableMetaInfo
))
{
// SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
// for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
// SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
// numOfTables += pVnodeSidList->numOfSids;
// }
}
return
result
;
}
SSubscriptionProgress
*
progress
=
(
SSubscriptionProgress
*
)
calloc
(
numOfTables
,
sizeof
(
SSubscriptionProgress
));
if
(
progress
==
NULL
)
{
tscError
(
"failed to allocate memory for progress: %s"
,
pSub
->
topic
);
return
0
;
static
int
tscUpdateSubscription
(
STscObj
*
pObj
,
SSub
*
pSub
)
{
SSqlObj
*
pSql
=
pSub
->
pSql
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
pSub
->
lastSyncTime
=
taosGetTimestampMs
();
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
pCmd
->
clauseIndex
,
0
);
if
(
UTIL_TABLE_IS_NORMAL_TABLE
(
pTableMetaInfo
))
{
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
SSubscriptionProgress
target
=
{.
uid
=
pTableMeta
->
uid
,
.
key
=
0
};
SSubscriptionProgress
*
p
=
taosArraySearch
(
pSub
->
progress
,
tscCompareSubscriptionProgress
,
&
target
);
if
(
p
==
NULL
)
{
taosArrayClear
(
pSub
->
progress
);
taosArrayPush
(
pSub
->
progress
,
&
target
);
}
return
1
;
}
if
(
UTIL_TABLE_IS_NOMRAL_TABLE
(
pTableMetaInfo
))
{
numOfTables
=
1
;
int64_t
uid
=
pTableMetaInfo
->
pTableMeta
->
uid
;
progress
[
0
].
uid
=
uid
;
progress
[
0
].
key
=
tscGetSubscriptionProgress
(
pSub
,
uid
);
}
else
{
// SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
// numOfTables = 0;
// for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
// SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
// for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) {
// STableIdInfo *pTableMetaInfo = tscGetMeterSidInfo(pVnodeSidList, j);
// int64_t uid = pTableMetaInfo->uid;
// progress[numOfTables].uid = uid;
// progress[numOfTables++].key = tscGetSubscriptionProgress(pSub, uid);
// }
// }
qsort
(
progress
,
numOfTables
,
sizeof
(
SSubscriptionProgress
),
tscCompareSubscriptionProgress
);
SArray
*
tables
=
getTableList
(
pSql
);
size_t
numOfTables
=
taosArrayGetSize
(
tables
);
SArray
*
progress
=
taosArrayInit
(
numOfTables
,
sizeof
(
SSubscriptionProgress
));
for
(
size_t
i
=
0
;
i
<
numOfTables
;
i
++
)
{
STidTags
*
tt
=
taosArrayGet
(
tables
,
i
);
SSubscriptionProgress
p
=
{
.
uid
=
tt
->
uid
};
p
.
key
=
tscGetSubscriptionProgress
(
pSub
,
tt
->
uid
,
INT64_MIN
);
taosArrayPush
(
progress
,
&
p
);
}
taosArraySort
(
progress
,
tscCompareSubscriptionProgress
);
free
(
pSub
->
progress
);
pSub
->
numOfTables
=
numOfTables
;
taosArrayDestroy
(
pSub
->
progress
);
pSub
->
progress
=
progress
;
pSub
->
lastSyncTime
=
taosGetTimestampMs
();
if
(
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
))
{
taosArraySort
(
tables
,
tscCompareTidTags
);
tscBuildVgroupTableInfo
(
pTableMetaInfo
,
tables
);
}
taosArrayDestroy
(
tables
);
return
1
;
}
...
...
@@ -248,32 +276,22 @@ static int tscLoadSubscriptionProgress(SSub* pSub) {
return
0
;
}
if
(
fgets
(
buf
,
sizeof
(
buf
),
fp
)
==
NULL
||
atoi
(
buf
)
<
0
)
{
tscTrace
(
"invalid subscription progress file: %s"
,
pSub
->
topic
);
fclose
(
fp
);
return
0
;
}
int
numOfTables
=
atoi
(
buf
);
SSubscriptionProgress
*
progress
=
calloc
(
numOfTables
,
sizeof
(
SSubscriptionProgress
));
for
(
int
i
=
0
;
i
<
numOfTables
;
i
++
)
{
SArray
*
progress
=
pSub
->
progress
;
taosArrayClear
(
progress
);
while
(
1
)
{
if
(
fgets
(
buf
,
sizeof
(
buf
),
fp
)
==
NULL
)
{
fclose
(
fp
);
free
(
progress
);
return
0
;
}
int64_t
uid
,
key
;
sscanf
(
buf
,
"%"
SCNd64
":%"
SCNd64
,
&
uid
,
&
key
);
progress
[
i
].
uid
=
uid
;
progress
[
i
].
key
=
key
;
SSubscriptionProgress
p
;
sscanf
(
buf
,
"%"
SCNd64
":%"
SCNd64
,
&
p
.
uid
,
&
p
.
key
);
taosArrayPush
(
progress
,
&
p
);
}
fclose
(
fp
);
qsort
(
progress
,
numOfTables
,
sizeof
(
SSubscriptionProgress
),
tscCompareSubscriptionProgress
);
pSub
->
numOfTables
=
numOfTables
;
pSub
->
progress
=
progress
;
tscTrace
(
"subscription progress loaded, %d tables: %s"
,
numOfTables
,
pSub
->
topic
);
taosArraySort
(
progress
,
tscCompareSubscriptionProgress
);
tscTrace
(
"subscription progress loaded, %d tables: %s"
,
taosArrayGetSize
(
progress
),
pSub
->
topic
);
return
1
;
}
...
...
@@ -294,11 +312,10 @@ void tscSaveSubscriptionProgress(void* sub) {
}
fputs
(
pSub
->
pSql
->
sqlstr
,
fp
);
fprintf
(
fp
,
"
\n
%d
\n
"
,
pSub
->
numOfTables
);
for
(
int
i
=
0
;
i
<
pSub
->
numOfTables
;
i
++
)
{
int64_t
uid
=
pSub
->
progress
[
i
].
uid
;
TSKEY
key
=
pSub
->
progress
[
i
].
key
;
fprintf
(
fp
,
"%"
PRId64
":%"
PRId64
"
\n
"
,
uid
,
key
);
fprintf
(
fp
,
"
\n
"
);
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
pSub
->
progress
);
i
++
)
{
SSubscriptionProgress
*
p
=
taosArrayGet
(
pSub
->
progress
,
i
);
fprintf
(
fp
,
"%"
PRId64
":%"
PRId64
"
\n
"
,
p
->
uid
,
p
->
key
);
}
fclose
(
fp
);
...
...
@@ -363,35 +380,34 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
tscRemoveFromSqlList
(
pSql
);
if
(
taosGetTimestampMs
()
-
pSub
->
lastSyncTime
>
10
*
60
*
1000
)
{
tscTrace
(
"begin meter synchronization"
);
char
*
sqlstr
=
pSql
->
sqlstr
;
pSql
->
sqlstr
=
NULL
;
taos_free_result_imp
(
pSql
,
0
);
pSql
->
sqlstr
=
sqlstr
;
taosCacheEmpty
(
tscCacheHandle
);
tscTrace
(
"begin table synchronization"
);
if
(
!
tscUpdateSubscription
(
pSub
->
taos
,
pSub
))
return
NULL
;
tscTrace
(
"meter synchronization completed"
);
}
else
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
uint32_t
type
=
pQueryInfo
->
type
;
taos_free_result_imp
(
pSql
,
1
);
pRes
->
numOfRows
=
1
;
pRes
->
numOfTotal
=
0
;
pRes
->
qhandle
=
0
;
pSql
->
cmd
.
command
=
TSDB_SQL_SELECT
;
pQueryInfo
->
type
=
type
;
tscGetTableMetaInfoFromCmd
(
&
pSql
->
cmd
,
0
,
0
)
->
vgroupIndex
=
0
;
tscTrace
(
"table synchronization completed"
);
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
uint32_t
type
=
pQueryInfo
->
type
;
tscFreeSqlResult
(
pSql
);
pRes
->
numOfRows
=
1
;
pRes
->
qhandle
=
0
;
pSql
->
cmd
.
command
=
TSDB_SQL_SELECT
;
pQueryInfo
->
type
=
type
;
tscGetTableMetaInfoFromCmd
(
&
pSql
->
cmd
,
0
,
0
)
->
vgroupIndex
=
0
;
pSql
->
fp
=
asyncCallback
;
pSql
->
param
=
pSql
;
tscDoQuery
(
pSql
);
if
(
pRes
->
code
!=
TSDB_CODE_NOT_ACTIVE_TABLE
)
{
break
;
sem_wait
(
&
pSql
->
rspSem
);
if
(
pRes
->
code
!=
TSDB_CODE_SUCCESS
)
{
continue
;
}
// meter was removed, make sync time zero, so that next retry will
// do synchronization first
pSub
->
lastSyncTime
=
0
;
break
;
}
if
(
pRes
->
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -421,7 +437,7 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
}
tscFreeSqlObj
(
pSub
->
pSql
);
free
(
pSub
->
progress
);
taosArrayDestroy
(
pSub
->
progress
);
memset
(
pSub
,
0
,
sizeof
(
*
pSub
));
free
(
pSub
);
}
src/client/src/tscSubquery.c
浏览文件 @
bdc235b4
...
...
@@ -330,7 +330,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pNewQueryInfo
->
limit
=
pSupporter
->
limit
;
// fetch the join tag column
if
(
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pNewQueryInfo
,
0
);
assert
(
pQueryInfo
->
tagCond
.
joinInfo
.
hasJoin
);
...
...
@@ -463,77 +463,51 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj*
}
}
int32_t
t
agsOrderCompar
(
const
void
*
p1
,
const
void
*
p2
)
{
STidTags
*
t1
=
(
STidTags
*
)
p1
;
STidTags
*
t2
=
(
STidTags
*
)
p2
;
int32_t
t
scCompareTidTags
(
const
void
*
p1
,
const
void
*
p2
)
{
const
STidTags
*
t1
=
(
const
STidTags
*
)
p1
;
const
STidTags
*
t2
=
(
const
STidTags
*
)
p2
;
if
(
t1
->
vgId
!=
t2
->
vgId
)
{
return
(
t1
->
vgId
>
t2
->
vgId
)
?
1
:-
1
;
}
else
{
if
(
t1
->
tid
!=
t2
->
tid
)
{
return
(
t1
->
tid
>
t2
->
tid
)
?
1
:-
1
;
}
else
{
return
0
;
}
return
(
t1
->
vgId
>
t2
->
vgId
)
?
1
:
-
1
;
}
if
(
t1
->
tid
!=
t2
->
tid
)
{
return
(
t1
->
tid
>
t2
->
tid
)
?
1
:
-
1
;
}
return
0
;
}
static
void
doBuildVgroupTableInfo
(
SArray
*
res
,
STableMetaInfo
*
pTableMetaInfo
)
{
SArray
*
pGroup
=
taosArrayInit
(
4
,
sizeof
(
SVgroupTableInfo
));
SArray
*
vgTableIdItem
=
taosArrayInit
(
4
,
sizeof
(
STableIdInfo
));
int32_t
size
=
taosArrayGetSize
(
res
);
STidTags
*
prev
=
taosArrayGet
(
res
,
0
);
int32_t
prevVgId
=
prev
->
vgId
;
STableIdInfo
item
=
{.
uid
=
prev
->
uid
,
.
tid
=
prev
->
tid
,
.
key
=
INT64_MIN
};
taosArrayPush
(
vgTableIdItem
,
&
item
);
for
(
int32_t
k
=
1
;
k
<
size
;
++
k
)
{
STidTags
*
t1
=
taosArrayGet
(
res
,
k
);
if
(
prevVgId
!=
t1
->
vgId
)
{
SVgroupTableInfo
info
=
{
0
};
void
tscBuildVgroupTableInfo
(
STableMetaInfo
*
pTableMetaInfo
,
SArray
*
tables
)
{
SArray
*
result
=
taosArrayInit
(
4
,
sizeof
(
SVgroupTableInfo
)
);
SArray
*
vgTables
=
NULL
;
STidTags
*
prev
=
NULL
;
size_t
numOfTables
=
taosArrayGetSize
(
tables
);
for
(
size_t
i
=
0
;
i
<
numOfTables
;
i
++
)
{
STidTags
*
tt
=
taosArrayGet
(
tables
,
i
);
if
(
prev
==
NULL
||
tt
->
vgId
!=
prev
->
vgId
)
{
SVgroupsInfo
*
pvg
=
pTableMetaInfo
->
vgroupList
;
for
(
int32_t
m
=
0
;
m
<
pvg
->
numOfVgroups
;
++
m
)
{
if
(
prevVgId
==
pvg
->
vgroups
[
m
].
vgId
)
{
SVgroupTableInfo
info
=
{
0
};
for
(
int32_t
m
=
0
;
m
<
pvg
->
numOfVgroups
;
++
m
)
{
if
(
tt
->
vgId
==
pvg
->
vgroups
[
m
].
vgId
)
{
info
.
vgInfo
=
pvg
->
vgroups
[
m
];
break
;
}
}
assert
(
info
.
vgInfo
.
numOfIps
!=
0
);
info
.
itemList
=
vgTableIdItem
;
taosArrayPush
(
pGroup
,
&
info
);
vgTableIdItem
=
taosArrayInit
(
4
,
sizeof
(
STableIdInfo
));
STableIdInfo
item1
=
{.
uid
=
t1
->
uid
,
.
tid
=
t1
->
tid
,
.
key
=
INT64_MIN
};
taosArrayPush
(
vgTableIdItem
,
&
item1
);
prevVgId
=
t1
->
vgId
;
}
else
{
taosArrayPush
(
vgTableIdItem
,
&
item
);
}
}
if
(
taosArrayGetSize
(
vgTableIdItem
)
>
0
)
{
SVgroupTableInfo
info
=
{
0
};
SVgroupsInfo
*
pvg
=
pTableMetaInfo
->
vgroupList
;
for
(
int32_t
m
=
0
;
m
<
pvg
->
numOfVgroups
;
++
m
)
{
if
(
prevVgId
==
pvg
->
vgroups
[
m
].
vgId
)
{
info
.
vgInfo
=
pvg
->
vgroups
[
m
];
break
;
}
assert
(
info
.
vgInfo
.
numOfIps
!=
0
);
vgTables
=
taosArrayInit
(
4
,
sizeof
(
STableIdInfo
)
);
info
.
itemList
=
vgTables
;
taosArrayPush
(
result
,
&
info
);
}
assert
(
info
.
vgInfo
.
numOfIps
!=
0
)
;
info
.
itemList
=
vgTableIdItem
;
taosArrayPush
(
pGroup
,
&
info
)
;
STableIdInfo
item
=
{
.
uid
=
tt
->
uid
,
.
tid
=
tt
->
tid
,
.
key
=
INT64_MIN
}
;
taosArrayPush
(
vgTables
,
&
item
)
;
prev
=
tt
;
}
pTableMetaInfo
->
pVgroupTables
=
pGroup
;
pTableMetaInfo
->
pVgroupTables
=
result
;
}
static
void
issueTSCompQuery
(
SSqlObj
*
pSql
,
SJoinSupporter
*
pSupporter
,
SSqlObj
*
pParent
)
{
...
...
@@ -627,8 +601,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SJoinSupporter
*
p1
=
pParentSql
->
pSubs
[
0
]
->
param
;
SJoinSupporter
*
p2
=
pParentSql
->
pSubs
[
1
]
->
param
;
qsort
(
p1
->
pIdTagList
,
p1
->
num
,
p1
->
tagSize
,
t
agsOrderCompar
);
qsort
(
p2
->
pIdTagList
,
p2
->
num
,
p2
->
tagSize
,
t
agsOrderCompar
);
qsort
(
p1
->
pIdTagList
,
p1
->
num
,
p1
->
tagSize
,
t
scCompareTidTags
);
qsort
(
p2
->
pIdTagList
,
p2
->
num
,
p2
->
tagSize
,
t
scCompareTidTags
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
...
...
@@ -668,11 +642,11 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SQueryInfo
*
pQueryInfo1
=
tscGetQueryInfoDetail
(
pSubCmd1
,
0
);
STableMetaInfo
*
pTableMetaInfo1
=
tscGetMetaInfo
(
pQueryInfo1
,
0
);
doBuildVgroupTableInfo
(
s1
,
pTableMetaInfo
1
);
tscBuildVgroupTableInfo
(
pTableMetaInfo1
,
s
1
);
SQueryInfo
*
pQueryInfo2
=
tscGetQueryInfoDetail
(
pSubCmd2
,
0
);
STableMetaInfo
*
pTableMetaInfo2
=
tscGetMetaInfo
(
pQueryInfo2
,
0
);
doBuildVgroupTableInfo
(
s2
,
pTableMetaInfo
2
);
tscBuildVgroupTableInfo
(
pTableMetaInfo2
,
s
2
);
pSupporter
->
pState
->
numOfCompleted
=
0
;
pSupporter
->
pState
->
code
=
0
;
...
...
@@ -1096,7 +1070,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
tscInitQueryInfo
(
pNewQueryInfo
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pNewQueryInfo
,
0
);
if
(
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
// return the tableId & tag
if
(
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
// return the tableId & tag
SSchema
s
=
{
0
};
SColumnIndex
index
=
{
0
};
...
...
src/client/src/tscUtil.c
浏览文件 @
bdc235b4
...
...
@@ -157,7 +157,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
}
// for select query super table, the super table vgroup list can not be null in any cases.
if
(
pQueryInfo
->
command
==
TSDB_SQL_SELECT
&&
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
pQueryInfo
->
command
==
TSDB_SQL_SELECT
&&
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
assert
(
pTableMetaInfo
->
vgroupList
!=
NULL
);
}
...
...
@@ -172,7 +172,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
if
(((
pQueryInfo
->
type
&
TSDB_QUERY_TYPE_STABLE_SUBQUERY
)
!=
TSDB_QUERY_TYPE_STABLE_SUBQUERY
)
&&
pQueryInfo
->
command
==
TSDB_SQL_SELECT
)
{
return
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
);
return
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
);
}
return
false
;
...
...
@@ -187,7 +187,7 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
* 4. show queries, instead of a select query
*/
size_t
numOfExprs
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
if
(
pTableMetaInfo
==
NULL
||
!
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
)
||
if
(
pTableMetaInfo
==
NULL
||
!
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
)
||
pQueryInfo
->
command
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
numOfExprs
==
0
)
{
return
false
;
}
...
...
@@ -391,9 +391,11 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
}
// pSql->sqlstr will be used by tscBuildQueryStreamDesc
pthread_mutex_lock
(
&
pObj
->
mutex
);
tfree
(
pSql
->
sqlstr
);
pthread_mutex_unlock
(
&
pObj
->
mutex
);
if
(
pObj
->
signature
==
pObj
)
{
pthread_mutex_lock
(
&
pObj
->
mutex
);
tfree
(
pSql
->
sqlstr
);
pthread_mutex_unlock
(
&
pObj
->
mutex
);
}
tscFreeSqlResult
(
pSql
);
...
...
@@ -1348,7 +1350,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) {
return
false
;
}
if
(
colId
==
-
1
&&
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
colId
==
-
1
&&
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
))
{
return
true
;
}
...
...
@@ -1459,10 +1461,11 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb) {
}
/*
* the following
three
kinds of SqlObj should not be freed
* the following
four
kinds of SqlObj should not be freed
* 1. SqlObj for stream computing
* 2. main SqlObj
* 3. heartbeat SqlObj
* 4. SqlObj for subscription
*
* If res code is error and SqlObj does not belong to above types, it should be
* automatically freed for async query, ignoring that connection should be kept.
...
...
@@ -1475,7 +1478,7 @@ bool tscShouldBeFreed(SSqlObj* pSql) {
}
STscObj
*
pTscObj
=
pSql
->
pTscObj
;
if
(
pSql
->
pStream
!=
NULL
||
pTscObj
->
pHb
==
pSql
||
pTscObj
->
pSql
==
pSql
)
{
if
(
pSql
->
pStream
!=
NULL
||
pTscObj
->
pHb
==
pSql
||
pTscObj
->
pSql
==
pSql
||
pSql
->
pSubscription
!=
NULL
)
{
return
false
;
}
...
...
@@ -1858,8 +1861,15 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pFinalInfo
=
tscAddTableMetaInfo
(
pNewQueryInfo
,
name
,
pPrevTableMeta
,
pVgroupsInfo
,
pTableMetaInfo
->
tagColList
);
}
assert
(
pFinalInfo
->
pTableMeta
!=
NULL
&&
pNewQueryInfo
->
numOfTables
==
1
);
if
(
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
))
{
if
(
pFinalInfo
->
pTableMeta
==
NULL
)
{
tscError
(
"%p new subquery failed for get pMeterMeta is NULL from cache"
,
pSql
);
tscFreeSqlObj
(
pNew
);
return
NULL
;
}
assert
(
pNewQueryInfo
->
numOfTables
==
1
);
if
(
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
))
{
assert
(
pFinalInfo
->
vgroupList
!=
NULL
);
}
...
...
@@ -2000,7 +2010,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
assert
(
pRes
->
completed
);
// for normal table, do not try any more if result are exhausted
if
(
!
UTIL_TABLE_IS_SUPERTABLE
(
pTableMetaInfo
)
||
(
pTableMetaInfo
->
vgroupList
==
NULL
))
{
if
(
!
UTIL_TABLE_IS_SUPER
_
TABLE
(
pTableMetaInfo
)
||
(
pTableMetaInfo
->
vgroupList
==
NULL
))
{
return
false
;
}
...
...
@@ -2162,3 +2172,26 @@ void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t column
}
}
void
*
malloc_throw
(
size_t
size
)
{
void
*
p
=
malloc
(
size
);
if
(
p
==
NULL
)
{
THROW
(
TSDB_CODE_CLI_OUT_OF_MEMORY
);
}
return
p
;
}
void
*
calloc_throw
(
size_t
nmemb
,
size_t
size
)
{
void
*
p
=
calloc
(
nmemb
,
size
);
if
(
p
==
NULL
)
{
THROW
(
TSDB_CODE_CLI_OUT_OF_MEMORY
);
}
return
p
;
}
char
*
strdup_throw
(
const
char
*
str
)
{
char
*
p
=
strdup
(
str
);
if
(
p
==
NULL
)
{
THROW
(
TSDB_CODE_CLI_OUT_OF_MEMORY
);
}
return
p
;
}
src/inc/tsdb.h
浏览文件 @
bdc235b4
...
...
@@ -34,12 +34,14 @@ extern "C" {
#define TSDB_INVALID_SUPER_TABLE_ID -1
#define TSDB_STATUS_COMMIT_START 1
#define TSDB_STATUS_COMMIT_OVER 2
// --------- TSDB APPLICATION HANDLE DEFINITION
typedef
struct
{
// WAL handle
void
*
appH
;
void
*
cqH
;
int
(
*
walCallBack
)(
void
*
);
int
(
*
notifyStatus
)(
void
*
,
int
status
);
int
(
*
eventCallBack
)(
void
*
);
}
STsdbAppH
;
...
...
src/query/inc/queryExecutor.h
浏览文件 @
bdc235b4
...
...
@@ -182,6 +182,7 @@ typedef struct SQInfo {
SQueryRuntimeEnv
runtimeEnv
;
int32_t
groupIndex
;
int32_t
offset
;
// offset in group result set of subgroup, todo refactor
SArray
*
arrTableIdInfo
;
T_REF_DECLARE
()
/*
...
...
src/query/src/queryExecutor.c
浏览文件 @
bdc235b4
...
...
@@ -113,7 +113,6 @@ static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static
void
destroyTableQueryInfo
(
STableQueryInfo
*
pTableQueryInfo
,
int32_t
numOfCols
);
static
void
resetCtxOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
);
static
bool
hasMainOutput
(
SQuery
*
pQuery
);
static
void
createTableQueryInfo
(
SQInfo
*
pQInfo
);
static
void
buildTagQueryResult
(
SQInfo
*
pQInfo
);
static
int32_t
setAdditionalInfo
(
SQInfo
*
pQInfo
,
STableId
*
pTableId
,
STableQueryInfo
*
pTableQueryInfo
);
...
...
@@ -3463,7 +3462,11 @@ static bool hasMainOutput(SQuery *pQuery) {
return
false
;
}
STableQueryInfo
*
createTableQueryInfoImpl
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
STableId
tableId
,
STimeWindow
win
)
{
static
STableQueryInfo
*
createTableQueryInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
STableId
tableId
,
STimeWindow
win
)
{
STableQueryInfo
*
pTableQueryInfo
=
calloc
(
1
,
sizeof
(
STableQueryInfo
));
pTableQueryInfo
->
win
=
win
;
...
...
@@ -3870,7 +3873,18 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
data
+=
bytes
*
numOfRows
;
}
int32_t
numOfTables
=
(
int32_t
)
taosArrayGetSize
(
pQInfo
->
arrTableIdInfo
);
*
(
int32_t
*
)
data
=
htonl
(
numOfTables
);
data
+=
sizeof
(
int32_t
);
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
i
++
)
{
STableIdInfo
*
pSrc
=
taosArrayGet
(
pQInfo
->
arrTableIdInfo
,
i
);
STableIdInfo
*
pDst
=
(
STableIdInfo
*
)
data
;
pDst
->
uid
=
htobe64
(
pSrc
->
uid
);
pDst
->
tid
=
htonl
(
pSrc
->
tid
);
pDst
->
key
=
htobe64
(
pSrc
->
key
);
data
+=
sizeof
(
STableIdInfo
);
}
// all data returned, set query over
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
if
(
pQInfo
->
runtimeEnv
.
stableQuery
&&
isIntervalQuery
(
pQuery
))
{
...
...
@@ -4149,6 +4163,42 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) {
return
true
;
}
static
void
setupQueryHandle
(
void
*
tsdb
,
SQInfo
*
pQInfo
,
bool
isSTableQuery
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
if
(
onlyQueryTags
(
pQuery
))
{
return
;
}
if
(
isSTableQuery
&&
(
!
isIntervalQuery
(
pQuery
))
&&
(
!
isFixedOutputQuery
(
pQuery
)))
{
return
;
}
STsdbQueryCond
cond
=
{
.
twindow
=
pQuery
->
window
,
.
order
=
pQuery
->
order
.
order
,
.
colList
=
pQuery
->
colList
,
.
numOfCols
=
pQuery
->
numOfCols
,
};
if
(
!
isSTableQuery
&&
(
pQInfo
->
groupInfo
.
numOfTables
==
1
)
&&
(
cond
.
order
==
TSDB_ORDER_ASC
)
&&
(
!
isIntervalQuery
(
pQuery
))
&&
(
!
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
&&
(
!
isFixedOutputQuery
(
pQuery
))
)
{
SArray
*
pa
=
taosArrayGetP
(
pQInfo
->
groupInfo
.
pGroupList
,
0
);
SGroupItem
*
pItem
=
taosArrayGet
(
pa
,
0
);
cond
.
twindow
=
pItem
->
info
->
win
;
}
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryTables
(
tsdb
,
&
cond
,
&
pQInfo
->
tableIdGroupInfo
);
}
int32_t
doInitQInfo
(
SQInfo
*
pQInfo
,
void
*
param
,
void
*
tsdb
,
int32_t
vgId
,
bool
isSTableQuery
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
...
...
@@ -4157,26 +4207,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
setScanLimitationByResultBuffer
(
pQuery
);
changeExecuteScanOrder
(
pQuery
,
false
);
STsdbQueryCond
cond
=
{
.
twindow
=
pQuery
->
window
,
.
order
=
pQuery
->
order
.
order
,
.
colList
=
pQuery
->
colList
,
.
numOfCols
=
pQuery
->
numOfCols
,
};
// normal query setup the queryhandle here
if
(
!
onlyQueryTags
(
pQuery
))
{
if
(
!
isSTableQuery
&&
isFirstLastRowQuery
(
pQuery
))
{
// in case of last_row query, invoke a different API.
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryLastRow
(
tsdb
,
&
cond
,
&
pQInfo
->
tableIdGroupInfo
);
}
else
if
(
!
isSTableQuery
||
isIntervalQuery
(
pQuery
)
||
isFixedOutputQuery
(
pQuery
))
{
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryTables
(
tsdb
,
&
cond
,
&
pQInfo
->
tableIdGroupInfo
);
}
// create the table query support structures
createTableQueryInfo
(
pQInfo
);
}
setupQueryHandle
(
tsdb
,
pQInfo
,
isSTableQuery
);
pQInfo
->
tsdb
=
tsdb
;
pQInfo
->
vgId
=
vgId
;
...
...
@@ -4595,6 +4626,13 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
* to ensure that, we can reset the query range once query on a meter is completed.
*/
pQInfo
->
tableIndex
++
;
STableIdInfo
tidInfo
;
tidInfo
.
uid
=
item
->
id
.
uid
;
tidInfo
.
tid
=
item
->
id
.
tid
;
tidInfo
.
key
=
pQuery
->
current
->
lastKey
;
taosArrayPush
(
pQInfo
->
arrTableIdInfo
,
&
tidInfo
);
// if the buffer is full or group by each table, we need to jump out of the loop
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_RESBUF_FULL
)
/*||
isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)*/
)
{
...
...
@@ -4664,35 +4702,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQuery
->
limit
.
offset
);
}
static
void
createTableQueryInfo
(
SQInfo
*
pQInfo
)
{
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
// todo make sure the table are added the reference count to gauranteed that all involved tables are valid
size_t
numOfGroups
=
taosArrayGetSize
(
pQInfo
->
groupInfo
.
pGroupList
);
int32_t
index
=
0
;
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
// load all meter meta info
SArray
*
group
=
*
(
SArray
**
)
taosArrayGet
(
pQInfo
->
groupInfo
.
pGroupList
,
i
);
size_t
s
=
taosArrayGetSize
(
group
);
for
(
int32_t
j
=
0
;
j
<
s
;
++
j
)
{
SGroupItem
*
item
=
(
SGroupItem
*
)
taosArrayGet
(
group
,
j
);
// STableQueryInfo has been created for each table
if
(
item
->
info
!=
NULL
)
{
return
;
}
STableQueryInfo
*
pInfo
=
createTableQueryInfoImpl
(
&
pQInfo
->
runtimeEnv
,
item
->
id
,
pQuery
->
window
);
pInfo
->
groupIdx
=
i
;
pInfo
->
tableIndex
=
index
;
item
->
info
=
pInfo
;
index
+=
1
;
}
}
}
static
void
doSaveContext
(
SQInfo
*
pQInfo
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
...
...
@@ -4914,6 +4923,12 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_RESBUF_FULL
))
{
qTrace
(
"QInfo:%p query paused due to output limitation, next qrange:%"
PRId64
"-%"
PRId64
,
pQInfo
,
pQuery
->
current
->
lastKey
,
pQuery
->
window
.
ekey
);
}
else
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
STableIdInfo
tidInfo
;
tidInfo
.
uid
=
pQuery
->
current
->
id
.
uid
;
tidInfo
.
tid
=
pQuery
->
current
->
id
.
tid
;
tidInfo
.
key
=
pQuery
->
current
->
lastKey
;
taosArrayPush
(
pQInfo
->
arrTableIdInfo
,
&
tidInfo
);
}
if
(
!
isTSCompQuery
(
pQuery
))
{
...
...
@@ -5197,20 +5212,10 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx
static
char
*
createTableIdList
(
SQueryTableMsg
*
pQueryMsg
,
char
*
pMsg
,
SArray
**
pTableIdList
)
{
assert
(
pQueryMsg
->
numOfTables
>
0
);
*
pTableIdList
=
taosArrayInit
(
pQueryMsg
->
numOfTables
,
sizeof
(
STableId
));
STableIdInfo
*
pTableIdInfo
=
(
STableIdInfo
*
)
pMsg
;
pTableIdInfo
->
tid
=
htonl
(
pTableIdInfo
->
tid
);
pTableIdInfo
->
uid
=
htobe64
(
pTableIdInfo
->
uid
);
pTableIdInfo
->
key
=
htobe64
(
pTableIdInfo
->
key
);
STableId
id
=
{.
uid
=
pTableIdInfo
->
uid
,
.
tid
=
pTableIdInfo
->
tid
};
taosArrayPush
(
*
pTableIdList
,
&
id
);
pMsg
+=
sizeof
(
STableIdInfo
);
*
pTableIdList
=
taosArrayInit
(
pQueryMsg
->
numOfTables
,
sizeof
(
STableIdInfo
));
for
(
int32_t
j
=
1
;
j
<
pQueryMsg
->
numOfTables
;
++
j
)
{
pTableIdInfo
=
(
STableIdInfo
*
)
pMsg
;
for
(
int32_t
j
=
0
;
j
<
pQueryMsg
->
numOfTables
;
++
j
)
{
STableIdInfo
*
pTableIdInfo
=
(
STableIdInfo
*
)
pMsg
;
pTableIdInfo
->
tid
=
htonl
(
pTableIdInfo
->
tid
);
pTableIdInfo
->
uid
=
htobe64
(
pTableIdInfo
->
uid
);
...
...
@@ -5661,7 +5666,16 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
}
}
static
SQInfo
*
createQInfoImpl
(
SQueryTableMsg
*
pQueryMsg
,
SSqlGroupbyExpr
*
pGroupbyExpr
,
SExprInfo
*
pExprs
,
static
int
compareTableIdInfo
(
const
void
*
a
,
const
void
*
b
)
{
const
STableIdInfo
*
x
=
(
const
STableIdInfo
*
)
a
;
const
STableIdInfo
*
y
=
(
const
STableIdInfo
*
)
b
;
if
(
x
->
uid
>
y
->
uid
)
return
1
;
if
(
x
->
uid
<
y
->
uid
)
return
-
1
;
return
0
;
}
static
SQInfo
*
createQInfoImpl
(
SQueryTableMsg
*
pQueryMsg
,
SArray
*
pTableIdList
,
SSqlGroupbyExpr
*
pGroupbyExpr
,
SExprInfo
*
pExprs
,
STableGroupInfo
*
groupInfo
,
SColumnInfo
*
pTagCols
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
calloc
(
1
,
sizeof
(
SQInfo
));
if
(
pQInfo
==
NULL
)
{
...
...
@@ -5754,6 +5768,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQInfo
->
groupInfo
.
pGroupList
=
taosArrayInit
(
numOfGroups
,
POINTER_BYTES
);
pQInfo
->
groupInfo
.
numOfTables
=
groupInfo
->
numOfTables
;
int
tableIndex
=
0
;
STimeWindow
window
=
pQueryMsg
->
window
;
taosArraySort
(
pTableIdList
,
compareTableIdInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
SArray
*
pa
=
taosArrayGetP
(
groupInfo
->
pGroupList
,
i
);
size_t
s
=
taosArrayGetSize
(
pa
);
...
...
@@ -5761,13 +5778,26 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
SArray
*
p1
=
taosArrayInit
(
s
,
sizeof
(
SGroupItem
));
for
(
int32_t
j
=
0
;
j
<
s
;
++
j
)
{
SGroupItem
item
=
{
.
id
=
*
(
STableId
*
)
taosArrayGet
(
pa
,
j
),
.
info
=
NULL
,
};
STableId
id
=
*
(
STableId
*
)
taosArrayGet
(
pa
,
j
);
SGroupItem
item
=
{
.
id
=
id
};
// NOTE: compare STableIdInfo with STableId
// not a problem at present because we only use their 1st int64_t field
STableIdInfo
*
pTableId
=
taosArraySearch
(
pTableIdList
,
compareTableIdInfo
,
&
id
);
if
(
pTableId
!=
NULL
)
{
window
.
skey
=
pTableId
->
key
;
}
else
{
window
.
skey
=
pQueryMsg
->
window
.
skey
;
}
item
.
info
=
createTableQueryInfo
(
&
pQInfo
->
runtimeEnv
,
item
.
id
,
window
);
item
.
info
->
groupIdx
=
i
;
item
.
info
->
tableIndex
=
tableIndex
++
;
taosArrayPush
(
p1
,
&
item
);
}
taosArrayPush
(
pQInfo
->
groupInfo
.
pGroupList
,
&
p1
);
}
pQInfo
->
arrTableIdInfo
=
taosArrayInit
(
tableIndex
,
sizeof
(
STableIdInfo
));
pQuery
->
pos
=
-
1
;
pQuery
->
window
=
pQueryMsg
->
window
;
...
...
@@ -5919,6 +5949,7 @@ static void freeQInfo(SQInfo *pQInfo) {
}
taosArrayDestroy
(
pQInfo
->
tableIdGroupInfo
.
pGroupList
);
taosArrayDestroy
(
pQInfo
->
arrTableIdInfo
);
if
(
pQuery
->
pGroupbyExpr
!=
NULL
)
{
taosArrayDestroy
(
pQuery
->
pGroupbyExpr
->
columnInfo
);
...
...
@@ -6046,32 +6077,48 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
if
(
TSDB_QUERY_HAS_TYPE
(
pQueryMsg
->
queryType
,
TSDB_QUERY_TYPE_MULTITABLE_QUERY
|
TSDB_QUERY_TYPE_TABLE_QUERY
))
{
isSTableQuery
=
TSDB_QUERY_HAS_TYPE
(
pQueryMsg
->
queryType
,
TSDB_QUERY_TYPE_MULTITABLE_QUERY
);
STableId
*
id
=
taosArrayGet
(
pTableIdList
,
0
);
STableId
Info
*
id
=
taosArrayGet
(
pTableIdList
,
0
);
if
((
code
=
tsdbGetOneTableGroup
(
tsdb
,
id
->
uid
,
&
groupInfo
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_over
;
}
}
else
if
(
TSDB_QUERY_HAS_TYPE
(
pQueryMsg
->
queryType
,
TSDB_QUERY_TYPE_STABLE_QUERY
))
{
isSTableQuery
=
true
;
STableId
*
id
=
taosArrayGet
(
pTableIdList
,
0
);
// group by normal column, do not pass the group by condition to tsdb to group table into different group
int32_t
numOfGroupByCols
=
pQueryMsg
->
numOfGroupCols
;
if
(
pQueryMsg
->
numOfGroupCols
==
1
&&
!
TSDB_COL_IS_TAG
(
pGroupColIndex
->
flag
))
{
numOfGroupByCols
=
0
;
}
// todo handle the error
/*int32_t ret =*/
tsdbQuerySTableByTagCond
(
tsdb
,
id
->
uid
,
tagCond
,
pQueryMsg
->
tagCondLen
,
pQueryMsg
->
tagNameRelType
,
tbnameCond
,
&
groupInfo
,
pGroupColIndex
,
numOfGroupByCols
);
if
(
groupInfo
.
numOfTables
==
0
)
{
// no qualified tables no need to do query
code
=
TSDB_CODE_SUCCESS
;
goto
_over
;
// TODO: need a macro from TSDB to check if table is super table,
// also note there's possiblity that only one table in the super table
if
(
taosArrayGetSize
(
pTableIdList
)
==
1
)
{
STableIdInfo
*
id
=
taosArrayGet
(
pTableIdList
,
0
);
// if array size is 1 and assert super table
// group by normal column, do not pass the group by condition to tsdb to group table into different group
int32_t
numOfGroupByCols
=
pQueryMsg
->
numOfGroupCols
;
if
(
pQueryMsg
->
numOfGroupCols
==
1
&&
!
TSDB_COL_IS_TAG
(
pGroupColIndex
->
flag
))
{
numOfGroupByCols
=
0
;
}
// todo handle the error
/*int32_t ret =*/
tsdbQuerySTableByTagCond
(
tsdb
,
id
->
uid
,
tagCond
,
pQueryMsg
->
tagCondLen
,
pQueryMsg
->
tagNameRelType
,
tbnameCond
,
&
groupInfo
,
pGroupColIndex
,
numOfGroupByCols
);
if
(
groupInfo
.
numOfTables
==
0
)
{
// no qualified tables no need to do query
code
=
TSDB_CODE_SUCCESS
;
goto
_over
;
}
}
else
{
groupInfo
.
numOfTables
=
taosArrayGetSize
(
pTableIdList
);
SArray
*
pTableGroup
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SArray
*
sa
=
taosArrayInit
(
groupInfo
.
numOfTables
,
sizeof
(
STableId
));
for
(
int32_t
i
=
0
;
i
<
groupInfo
.
numOfTables
;
++
i
)
{
STableIdInfo
*
tableId
=
taosArrayGet
(
pTableIdList
,
i
);
taosArrayPush
(
sa
,
tableId
);
}
taosArrayPush
(
pTableGroup
,
&
sa
);
groupInfo
.
pGroupList
=
pTableGroup
;
}
}
else
{
assert
(
0
);
}
(
*
pQInfo
)
=
createQInfoImpl
(
pQueryMsg
,
pGroupbyExpr
,
pExprs
,
&
groupInfo
,
pTagColumnInfo
);
(
*
pQInfo
)
=
createQInfoImpl
(
pQueryMsg
,
p
TableIdList
,
p
GroupbyExpr
,
pExprs
,
&
groupInfo
,
pTagColumnInfo
);
if
((
*
pQInfo
)
==
NULL
)
{
code
=
TSDB_CODE_SERV_OUT_OF_MEMORY
;
goto
_over
;
...
...
@@ -6169,6 +6216,8 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
size_t
size
=
getResultSize
(
pQInfo
,
&
pQuery
->
rec
.
rows
);
size
+=
sizeof
(
int32_t
);
size
+=
sizeof
(
STableIdInfo
)
*
taosArrayGetSize
(
pQInfo
->
arrTableIdInfo
);
*
contLen
=
size
+
sizeof
(
SRetrieveTableRsp
);
// todo handle failed to allocate memory
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
bdc235b4
...
...
@@ -284,6 +284,7 @@ int32_t tsdbCloseRepo(TsdbRepoT *repo) {
pRepo
->
tsdbCache
->
curBlock
=
NULL
;
tsdbUnLockRepo
(
repo
);
if
(
pRepo
->
appH
.
notifyStatus
)
pRepo
->
appH
.
notifyStatus
(
pRepo
->
appH
.
appH
,
TSDB_STATUS_COMMIT_START
);
tsdbCommitData
((
void
*
)
repo
);
tsdbCloseFileH
(
pRepo
->
tsdbFileH
);
...
...
@@ -330,7 +331,7 @@ int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg) {
int32_t
tsdbTriggerCommit
(
TsdbRepoT
*
repo
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
if
(
pRepo
->
appH
.
walCallBack
)
pRepo
->
appH
.
walCallBack
(
pRepo
->
appH
.
appH
);
if
(
pRepo
->
appH
.
notifyStatus
)
pRepo
->
appH
.
notifyStatus
(
pRepo
->
appH
.
appH
,
TSDB_STATUS_COMMIT_START
);
tsdbLockRepo
(
repo
);
if
(
pRepo
->
commit
)
{
...
...
@@ -942,7 +943,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable) {
// Commit to file
static
void
*
tsdbCommitData
(
void
*
arg
)
{
printf
(
"Starting to commit....
\n
"
);
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
arg
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
STsdbCache
*
pCache
=
pRepo
->
tsdbCache
;
...
...
@@ -951,6 +951,8 @@ static void *tsdbCommitData(void *arg) {
SRWHelper
whelper
=
{
0
};
if
(
pCache
->
imem
==
NULL
)
return
NULL
;
tsdbPrint
(
"vgId: %d, starting to commit...."
,
pRepo
->
config
.
tsdbId
);
// Create the iterator to read from cache
SSkipListIterator
**
iters
=
tsdbCreateTableIters
(
pMeta
,
pCfg
->
maxTables
);
if
(
iters
==
NULL
)
{
...
...
@@ -974,6 +976,7 @@ static void *tsdbCommitData(void *arg) {
// Do retention actions
tsdbFitRetention
(
pRepo
);
if
(
pRepo
->
appH
.
notifyStatus
)
pRepo
->
appH
.
notifyStatus
(
pRepo
->
appH
.
appH
,
TSDB_STATUS_COMMIT_OVER
);
_exit:
tdFreeDataCols
(
pDataCols
);
...
...
@@ -1176,4 +1179,4 @@ uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, int32_t *
magic
=
*
size
;
return
magic
;
}
\ No newline at end of file
}
src/tsdb/src/tsdbRead.c
浏览文件 @
bdc235b4
...
...
@@ -1525,7 +1525,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagC
}
if
(
pTable
->
type
!=
TSDB_SUPER_TABLE
)
{
uError
(
"%p query normal tag not allowed, uid:%
, tid:%d, name:%s"
PRIu64
,
uError
(
"%p query normal tag not allowed, uid:%
"
PRIu64
", tid:%d, name:%s"
,
tsdb
,
uid
,
pTable
->
tableId
.
tid
,
pTable
->
name
);
return
TSDB_CODE_OPS_NOT_SUPPORT
;
//basically, this error is caused by invalid sql issued by client
...
...
src/util/inc/exception.h
浏览文件 @
bdc235b4
...
...
@@ -73,6 +73,7 @@ void cleanupPush_void_ptr_bool ( bool failOnly, void* func, void* arg1, bool ar
void
cleanupPush_void_ptr
(
bool
failOnly
,
void
*
func
,
void
*
arg
);
void
cleanupPush_int_int
(
bool
failOnly
,
void
*
func
,
int
arg
);
void
cleanupPush_void
(
bool
failOnly
,
void
*
func
);
void
cleanupPush_int_ptr
(
bool
failOnly
,
void
*
func
,
void
*
arg
);
int32_t
cleanupGetActionCount
();
void
cleanupExecuteTo
(
int32_t
anchor
,
bool
failed
);
...
...
@@ -83,8 +84,10 @@ void cleanupExecute( SExceptionNode* node, bool failed );
#define CLEANUP_PUSH_VOID_PTR( failOnly, func, arg ) cleanupPush_void_ptr( (failOnly), (void*)(func), (void*)(arg) )
#define CLEANUP_PUSH_INT_INT( failOnly, func, arg ) cleanupPush_void_ptr( (failOnly), (void*)(func), (int)(arg) )
#define CLEANUP_PUSH_VOID( failOnly, func ) cleanupPush_void( (failOnly), (void*)(func) )
#define CLEANUP_PUSH_INT_PTR( failOnly, func, arg ) cleanupPush_int_ptr( (failOnly), (void*)(func), (void*)(arg) )
#define CLEANUP_PUSH_FREE( failOnly, arg ) cleanupPush_void_ptr( (failOnly), free, (void*)(arg) )
#define CLEANUP_PUSH_CLOSE( failOnly, arg ) cleanupPush_int_int( (failOnly), close, (int)(arg) )
#define CLEANUP_PUSH_FCLOSE( failOnly, arg ) cleanupPush_int_ptr( (failOnly), fclose, (void*)(arg) )
#define CLEANUP_GET_ANCHOR() cleanupGetActionCount()
#define CLEANUP_EXECUTE_TO( anchor, failed ) cleanupExecuteTo( (anchor), (failed) )
...
...
@@ -95,7 +98,7 @@ void cleanupExecute( SExceptionNode* node, bool failed );
void
exceptionPushNode
(
SExceptionNode
*
node
);
int32_t
exceptionPopNode
();
void
exceptionThrow
(
int
code
);
void
exceptionThrow
(
int
32_t
code
);
#define TRY(maxCleanupActions) do { \
SExceptionNode exceptionNode = { 0 }; \
...
...
@@ -106,10 +109,10 @@ void exceptionThrow( int code );
int caughtException = setjmp( exceptionNode.jb ); \
if( caughtException == 0 )
#define CATCH( code ) int code = exceptionPopNode(); \
#define CATCH( code ) int
32_t
code = exceptionPopNode(); \
if( caughtException == 1 )
#define FINALLY( code ) int code = exceptionPopNode();
#define FINALLY( code ) int
32_t
code = exceptionPopNode();
#define END_TRY } while( 0 );
...
...
src/util/inc/tarray.h
浏览文件 @
bdc235b4
...
...
@@ -106,6 +106,12 @@ void taosArrayCopy(SArray* pDst, const SArray* pSrc);
*/
SArray
*
taosArrayClone
(
const
SArray
*
pSrc
);
/**
* clear the array (remove all element)
* @param pArray
*/
void
taosArrayClear
(
SArray
*
pArray
);
/**
* destroy array list
* @param pArray
...
...
src/util/src/exception.c
浏览文件 @
bdc235b4
...
...
@@ -14,7 +14,7 @@ int32_t exceptionPopNode() {
return
node
->
code
;
}
void
exceptionThrow
(
int
code
)
{
void
exceptionThrow
(
int
32_t
code
)
{
expList
->
code
=
code
;
longjmp
(
expList
->
jb
,
1
);
}
...
...
@@ -38,21 +38,27 @@ static void cleanupWrapper_void_ptr( SCleanupAction* ca ) {
static
void
cleanupWrapper_int_int
(
SCleanupAction
*
ca
)
{
int
(
*
func
)(
int
)
=
ca
->
func
;
func
(
(
int
)(
intptr_t
)(
ca
->
arg1
.
Int
)
);
func
(
ca
->
arg1
.
Int
);
}
static
void
cleanupWrapper_void
_void
(
SCleanupAction
*
ca
)
{
static
void
cleanupWrapper_void
(
SCleanupAction
*
ca
)
{
void
(
*
func
)()
=
ca
->
func
;
func
();
}
static
void
cleanupWrapper_int_ptr
(
SCleanupAction
*
ca
)
{
int
(
*
func
)(
void
*
)
=
ca
->
func
;
func
(
ca
->
arg1
.
Ptr
);
}
typedef
void
(
*
wrapper
)(
SCleanupAction
*
);
static
wrapper
wrappers
[]
=
{
cleanupWrapper_void_ptr_ptr
,
cleanupWrapper_void_ptr_bool
,
cleanupWrapper_void_ptr
,
cleanupWrapper_int_int
,
cleanupWrapper_void_void
,
cleanupWrapper_void
,
cleanupWrapper_int_ptr
,
};
...
...
@@ -107,6 +113,15 @@ void cleanupPush_void( bool failOnly, void* func ) {
ca
->
func
=
func
;
}
void
cleanupPush_int_ptr
(
bool
failOnly
,
void
*
func
,
void
*
arg
)
{
assert
(
expList
->
numCleanupAction
<
expList
->
maxCleanupAction
);
SCleanupAction
*
ca
=
expList
->
cleanupActions
+
expList
->
numCleanupAction
++
;
ca
->
wrapper
=
5
;
ca
->
failOnly
=
failOnly
;
ca
->
func
=
func
;
ca
->
arg1
.
Ptr
=
arg
;
}
int32_t
cleanupGetActionCount
()
{
...
...
@@ -118,8 +133,9 @@ static void doExecuteCleanup( SExceptionNode* node, int32_t anchor, bool failed
while
(
node
->
numCleanupAction
>
anchor
)
{
--
node
->
numCleanupAction
;
SCleanupAction
*
ca
=
node
->
cleanupActions
+
node
->
numCleanupAction
;
if
(
failed
||
!
(
ca
->
failOnly
)
)
if
(
failed
||
!
(
ca
->
failOnly
)
)
{
wrappers
[
ca
->
wrapper
](
ca
);
}
}
}
...
...
src/util/src/tarray.c
浏览文件 @
bdc235b4
...
...
@@ -176,6 +176,11 @@ SArray* taosArrayClone(const SArray* pSrc) {
return
dst
;
}
void
taosArrayClear
(
SArray
*
pArray
)
{
assert
(
pArray
!=
NULL
);
pArray
->
size
=
0
;
}
void
taosArrayDestroy
(
SArray
*
pArray
)
{
if
(
pArray
==
NULL
)
{
return
;
...
...
src/util/src/tcache.c
浏览文件 @
bdc235b4
...
...
@@ -439,6 +439,8 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz
uTrace
(
"key:%s %p added into cache, added:%"
PRIu64
", expire:%"
PRIu64
", total:%d, size:%"
PRId64
" bytes"
,
key
,
pNode
,
pNode
->
addedTime
,
pNode
->
expiredTime
,
pCacheObj
->
totalSize
,
dataSize
);
}
else
{
uError
(
"key:%s failed to added into cache, out of memory"
,
key
);
}
}
else
{
// old data exists, update the node
pNode
=
taosUpdateCacheImpl
(
pCacheObj
,
pOld
,
key
,
keyLen
,
pData
,
dataSize
,
duration
*
1000L
);
...
...
src/vnode/inc/vnodeInt.h
浏览文件 @
bdc235b4
...
...
@@ -38,6 +38,7 @@ typedef struct {
int
status
;
int8_t
role
;
int64_t
version
;
int64_t
savedVersion
;
void
*
wqueue
;
void
*
rqueue
;
void
*
wal
;
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
bdc235b4
...
...
@@ -33,12 +33,11 @@ static int32_t tsOpennedVnodes;
static
void
*
tsDnodeVnodesHash
;
static
void
vnodeCleanUp
(
SVnodeObj
*
pVnode
);
static
void
vnodeBuildVloadMsg
(
char
*
pNode
,
void
*
param
);
static
int
vnodeWalCallback
(
void
*
arg
);
static
int32_t
vnodeSaveCfg
(
SMDCreateVnodeMsg
*
pVnodeCfg
);
static
int32_t
vnodeReadCfg
(
SVnodeObj
*
pVnode
);
static
int32_t
vnodeSaveVersion
(
SVnodeObj
*
pVnode
);
static
bool
vnodeReadVersion
(
SVnodeObj
*
pVnode
);
static
int
vnode
WalCallback
(
void
*
arg
);
static
int
vnode
ProcessTsdbStatus
(
void
*
arg
,
int
status
);
static
uint32_t
vnodeGetFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
int32_t
*
size
);
static
int
vnodeGetWalInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
);
static
void
vnodeNotifyRole
(
void
*
ahandle
,
int8_t
role
);
...
...
@@ -206,7 +205,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
STsdbAppH
appH
=
{
0
};
appH
.
appH
=
(
void
*
)
pVnode
;
appH
.
walCallBack
=
vnodeWalCallback
;
appH
.
notifyStatus
=
vnodeProcessTsdbStatus
;
appH
.
cqH
=
pVnode
->
cq
;
sprintf
(
temp
,
"%s/tsdb"
,
rootDir
);
...
...
@@ -374,14 +373,22 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
walClose
(
pVnode
->
wal
);
pVnode
->
wal
=
NULL
;
vnodeSaveVersion
(
pVnode
);
vnodeRelease
(
pVnode
);
}
// TODO: this is a simple implement
static
int
vnode
WalCallback
(
void
*
arg
)
{
static
int
vnode
ProcessTsdbStatus
(
void
*
arg
,
int
status
)
{
SVnodeObj
*
pVnode
=
arg
;
return
walRenew
(
pVnode
->
wal
);
if
(
status
==
TSDB_STATUS_COMMIT_START
)
{
pVnode
->
savedVersion
=
pVnode
->
version
;
return
walRenew
(
pVnode
->
wal
);
}
if
(
status
==
TSDB_STATUS_COMMIT_OVER
)
return
vnodeSaveVersion
(
pVnode
);
return
0
;
}
static
uint32_t
vnodeGetFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
int32_t
*
size
)
{
...
...
@@ -414,7 +421,7 @@ static void vnodeNotifyFileSynced(void *ahandle) {
tsdbCloseRepo
(
pVnode
->
tsdb
);
STsdbAppH
appH
=
{
0
};
appH
.
appH
=
(
void
*
)
pVnode
;
appH
.
walCallBack
=
vnodeWalCallback
;
appH
.
notifyStatus
=
vnodeProcessTsdbStatus
;
appH
.
cqH
=
pVnode
->
cq
;
pVnode
->
tsdb
=
tsdbOpenRepo
(
rootDir
,
&
appH
);
}
...
...
@@ -685,14 +692,14 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
version
\"
: %"
PRId64
"
\n
"
,
pVnode
->
v
ersion
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
version
\"
: %"
PRId64
"
\n
"
,
pVnode
->
savedV
ersion
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
fwrite
(
content
,
1
,
len
,
fp
);
fclose
(
fp
);
free
(
content
);
vPrint
(
"vgId:%d, save vnode version:%"
PRId64
" succe
ssed"
,
pVnode
->
vgId
,
pVnode
->
v
ersion
);
vPrint
(
"vgId:%d, save vnode version:%"
PRId64
" succe
ed"
,
pVnode
->
vgId
,
pVnode
->
savedV
ersion
);
return
0
;
}
...
...
@@ -734,7 +741,7 @@ static bool vnodeReadVersion(SVnodeObj *pVnode) {
ret
=
true
;
vPrint
(
"vgId:%d, read vnode version succe
ssed, version:%
%"
PRId64
,
pVnode
->
vgId
,
pVnode
->
version
);
vPrint
(
"vgId:%d, read vnode version succe
ed, version:
%"
PRId64
,
pVnode
->
vgId
,
pVnode
->
version
);
PARSE_OVER:
free
(
content
);
...
...
tests/examples/c/subscribe.c
浏览文件 @
bdc235b4
...
...
@@ -56,32 +56,46 @@ void run_test(TAOS* taos) {
taos_query
(
taos
,
"drop database if exists test;"
);
usleep
(
100000
);
taos_query
(
taos
,
"create database test tables 5;"
);
//taos_query(taos, "create database test tables 5;");
taos_query
(
taos
,
"create database test;"
);
usleep
(
100000
);
taos_query
(
taos
,
"use test;"
);
usleep
(
100000
);
taos_query
(
taos
,
"create table meters(ts timestamp, a int, b binary(20)) tags(loc binary(20), area int);"
);
taos_query
(
taos
,
"insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:00:00.000', 0, 'china');"
);
taos_query
(
taos
,
"insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:01:00.000', 0, 'china');"
);
taos_query
(
taos
,
"insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:02:00.000', 0, 'china');"
);
taos_query
(
taos
,
"insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:00:00.000', 0, 'china');"
);
taos_query
(
taos
,
"insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:01:00.000', 0, 'china');"
);
taos_query
(
taos
,
"insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:02:00.000', 0, 'china');"
);
taos_query
(
taos
,
"insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.000', 0, 'china');"
);
taos_query
(
taos
,
"insert into t2 using meters tags('london', 0) values('2020-01-01 00:00:00.000', 0, 'UK');"
);
taos_query
(
taos
,
"insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:00.000', 0, 'UK');"
);
taos_query
(
taos
,
"insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:01.000', 0, 'UK');"
);
taos_query
(
taos
,
"insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.000', 0, 'UK');"
);
taos_query
(
taos
,
"insert into t3 using meters tags('tianjin', 0) values('2020-01-01 00:01:02.000', 0, 'china');"
);
taos_query
(
taos
,
"insert into t4 using meters tags('wuhan', 0) values('2020-01-01 00:01:02.000', 0, 'china');"
);
taos_query
(
taos
,
"insert into t5 using meters tags('jinan', 0) values('2020-01-01 00:01:02.000', 0, 'china');"
);
taos_query
(
taos
,
"insert into t6 using meters tags('haikou', 0) values('2020-01-01 00:01:02.000', 0, 'china');"
);
taos_query
(
taos
,
"insert into t7 using meters tags('nanjing', 0) values('2020-01-01 00:01:02.000', 0, 'china');"
);
taos_query
(
taos
,
"insert into t8 using meters tags('lanzhou', 0) values('2020-01-01 00:01:02.000', 0, 'china');"
);
taos_query
(
taos
,
"insert into t9 using meters tags('tokyo', 0) values('2020-01-01 00:01:02.000', 0, 'japan');"
);
taos_query
(
taos
,
"create table meters(ts timestamp, a int) tags(area int);"
);
taos_query
(
taos
,
"create table t0 using meters tags(0);"
);
taos_query
(
taos
,
"create table t1 using meters tags(1);"
);
taos_query
(
taos
,
"create table t2 using meters tags(2);"
);
taos_query
(
taos
,
"create table t3 using meters tags(3);"
);
taos_query
(
taos
,
"create table t4 using meters tags(4);"
);
taos_query
(
taos
,
"create table t5 using meters tags(5);"
);
taos_query
(
taos
,
"create table t6 using meters tags(6);"
);
taos_query
(
taos
,
"create table t7 using meters tags(7);"
);
taos_query
(
taos
,
"create table t8 using meters tags(8);"
);
taos_query
(
taos
,
"create table t9 using meters tags(9);"
);
taos_query
(
taos
,
"insert into t0 values('2020-01-01 00:00:00.000', 0);"
);
taos_query
(
taos
,
"insert into t0 values('2020-01-01 00:01:00.000', 0);"
);
taos_query
(
taos
,
"insert into t0 values('2020-01-01 00:02:00.000', 0);"
);
taos_query
(
taos
,
"insert into t1 values('2020-01-01 00:00:00.000', 0);"
);
taos_query
(
taos
,
"insert into t1 values('2020-01-01 00:01:00.000', 0);"
);
taos_query
(
taos
,
"insert into t1 values('2020-01-01 00:02:00.000', 0);"
);
taos_query
(
taos
,
"insert into t1 values('2020-01-01 00:03:00.000', 0);"
);
taos_query
(
taos
,
"insert into t2 values('2020-01-01 00:00:00.000', 0);"
);
taos_query
(
taos
,
"insert into t2 values('2020-01-01 00:01:00.000', 0);"
);
taos_query
(
taos
,
"insert into t2 values('2020-01-01 00:01:01.000', 0);"
);
taos_query
(
taos
,
"insert into t2 values('2020-01-01 00:01:02.000', 0);"
);
taos_query
(
taos
,
"insert into t3 values('2020-01-01 00:01:02.000', 0);"
);
taos_query
(
taos
,
"insert into t4 values('2020-01-01 00:01:02.000', 0);"
);
taos_query
(
taos
,
"insert into t5 values('2020-01-01 00:01:02.000', 0);"
);
taos_query
(
taos
,
"insert into t6 values('2020-01-01 00:01:02.000', 0);"
);
taos_query
(
taos
,
"insert into t7 values('2020-01-01 00:01:02.000', 0);"
);
taos_query
(
taos
,
"insert into t8 values('2020-01-01 00:01:02.000', 0);"
);
taos_query
(
taos
,
"insert into t9 values('2020-01-01 00:01:02.000', 0);"
);
// super tables subscription
usleep
(
1000000
);
TAOS_SUB
*
tsub
=
taos_subscribe
(
taos
,
0
,
"test"
,
"select * from meters;"
,
NULL
,
NULL
,
0
);
TAOS_RES
*
res
=
taos_consume
(
tsub
);
...
...
@@ -90,23 +104,23 @@ void run_test(TAOS* taos) {
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
0
);
taos_query
(
taos
,
"insert into t0 values('2020-01-01 00:0
3:00.000', 0, 'china'
);"
);
taos_query
(
taos
,
"insert into t8 values('2020-01-01 00:01:03.000', 0
, 'china'
);"
);
taos_query
(
taos
,
"insert into t0 values('2020-01-01 00:0
2:00.001', 0
);"
);
taos_query
(
taos
,
"insert into t8 values('2020-01-01 00:01:03.000', 0);"
);
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
2
);
taos_query
(
taos
,
"insert into t2 values('2020-01-01 00:01:02.001', 0
, 'UK'
);"
);
taos_query
(
taos
,
"insert into t1 values('2020-01-01 00:03:00.001', 0
, 'UK'
);"
);
taos_query
(
taos
,
"insert into t2 values('2020-01-01 00:01:02.001', 0);"
);
taos_query
(
taos
,
"insert into t1 values('2020-01-01 00:03:00.001', 0);"
);
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
2
);
taos_query
(
taos
,
"insert into t1 values('2020-01-01 00:03:00.002', 0
, 'china'
);"
);
taos_query
(
taos
,
"insert into t1 values('2020-01-01 00:03:00.002', 0);"
);
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
1
);
// keep progress information and restart subscription
taos_unsubscribe
(
tsub
,
1
);
taos_query
(
taos
,
"insert into t0 values('2020-01-01 00:04:00.000', 0
, 'china'
);"
);
taos_query
(
taos
,
"insert into t0 values('2020-01-01 00:04:00.000', 0);"
);
tsub
=
taos_subscribe
(
taos
,
1
,
"test"
,
"select * from meters;"
,
NULL
,
NULL
,
0
);
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
24
);
...
...
@@ -133,7 +147,7 @@ void run_test(TAOS* taos) {
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
0
);
taos_query
(
taos
,
"insert into t0 values('2020-01-01 00:04:00.001', 0
, 'china'
);"
);
taos_query
(
taos
,
"insert into t0 values('2020-01-01 00:04:00.001', 0);"
);
res
=
taos_consume
(
tsub
);
check_row_count
(
__LINE__
,
res
,
1
);
...
...
@@ -197,7 +211,7 @@ int main(int argc, char *argv[]) {
// init TAOS
taos_init
();
TAOS
*
taos
=
taos_connect
(
host
,
user
,
passwd
,
"
test
"
,
0
);
TAOS
*
taos
=
taos_connect
(
host
,
user
,
passwd
,
""
,
0
);
if
(
taos
==
NULL
)
{
printf
(
"failed to connect to db, reason:%s
\n
"
,
taos_errstr
(
taos
));
exit
(
1
);
...
...
@@ -209,6 +223,7 @@ int main(int argc, char *argv[]) {
exit
(
0
);
}
taos_query
(
taos
,
"use test;"
);
TAOS_SUB
*
tsub
=
NULL
;
if
(
async
)
{
// create an asynchronized subscription, the callback function will be called every 1s
...
...
tests/script/general/cache/new_metrics.sim
浏览文件 @
bdc235b4
...
...
@@ -27,7 +27,7 @@ sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol bool)
$i = 0
while $i < 5
$tb = $tbPrefix . $i
sql create table $tb using $mt tags(
0
)
sql create table $tb using $mt tags(
$i
)
$x = 0
while $x < $rowNum
$val = $x * 60000
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录