Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0b6fd5c2
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
0b6fd5c2
编写于
1月 04, 2022
作者:
H
Haojun Liao
提交者:
GitHub
1月 04, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #8821 from taosdata/fix/TS-530
Fix/TS-530 CQ Support write result to superTable
上级
8f7f5ed4
66fd6554
变更
28
展开全部
隐藏空白更改
内联
并排
Showing
28 changed file
with
2277 addition
and
2067 deletion
+2277
-2067
Jenkinsfile
Jenkinsfile
+4
-0
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+8
-0
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+6
-0
src/client/src/taos.def
src/client/src/taos.def
+1
-0
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+15
-0
src/client/src/tscServer.c
src/client/src/tscServer.c
+87
-17
src/client/src/tscSql.c
src/client/src/tscSql.c
+122
-1
src/client/src/tscStream.c
src/client/src/tscStream.c
+384
-35
src/common/inc/tglobal.h
src/common/inc/tglobal.h
+1
-1
src/common/src/tglobal.c
src/common/src/tglobal.c
+4
-4
src/cq/src/cqMain.c
src/cq/src/cqMain.c
+6
-4
src/inc/taos.h
src/inc/taos.h
+5
-1
src/inc/trpc.h
src/inc/trpc.h
+1
-0
src/inc/ttokendef.h
src/inc/ttokendef.h
+81
-80
src/mnode/src/mnodeTable.c
src/mnode/src/mnodeTable.c
+16
-6
src/query/inc/qSqlparser.h
src/query/inc/qSqlparser.h
+5
-0
src/query/inc/sql.y
src/query/inc/sql.y
+15
-1
src/query/src/qSqlParser.c
src/query/src/qSqlParser.c
+5
-0
src/query/src/queryMain.c
src/query/src/queryMain.c
+2
-1
src/query/src/sql.c
src/query/src/sql.c
+1451
-1904
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+6
-0
src/util/inc/tidpool.h
src/util/inc/tidpool.h
+2
-0
src/util/inc/ttoken.h
src/util/inc/ttoken.h
+9
-0
src/util/src/tidpool.c
src/util/src/tidpool.c
+13
-1
src/util/src/ttokenizer.c
src/util/src/ttokenizer.c
+14
-0
tests/pytest/stream/metric_1.py
tests/pytest/stream/metric_1.py
+3
-2
tests/pytest/stream/new.py
tests/pytest/stream/new.py
+10
-8
tests/pytest/stream/sys.py
tests/pytest/stream/sys.py
+1
-1
未找到文件。
Jenkinsfile
浏览文件 @
0b6fd5c2
...
...
@@ -67,6 +67,7 @@ def pre_test(){
}
sh
'''
cd ${WKC}
git remote prune origin
git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
...
...
@@ -140,6 +141,7 @@ def pre_test_noinstall(){
}
sh
'''
cd ${WKC}
git remote prune origin
git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
...
...
@@ -210,6 +212,7 @@ def pre_test_ningsi(){
}
sh
'''
cd ${WKC}
git remote prune origin
git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
...
...
@@ -284,6 +287,7 @@ def pre_test_win(){
}
bat
'''
cd C:\\workspace\\TDinternal\\community
git remote prune origin
git pull
git fetch origin +refs/pull/%CHANGE_ID%/merge
git checkout -qf FETCH_HEAD
...
...
src/client/inc/tscUtil.h
浏览文件 @
0b6fd5c2
...
...
@@ -29,6 +29,14 @@ extern "C" {
#include "tsched.h"
#include "tsclient.h"
#define LABEL_SQL "sql:"
#define LABEL_TO " to:"
#define LABEL_SPLIT " split:"
#define LABEL_SQL_LEN (sizeof(LABEL_SQL) - 1)
#define LABEL_TO_LEN (sizeof(LABEL_TO) - 1)
#define LABEL_SPLIT_LEN (sizeof(LABEL_SPLIT) - 1)
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
...
...
src/client/inc/tsclient.h
浏览文件 @
0b6fd5c2
...
...
@@ -406,6 +406,10 @@ typedef struct SSqlStream {
int16_t
precision
;
int64_t
num
;
// number of computing count
int32_t
dstCols
;
// dstTable has number of columns
char
*
to
;
char
*
split
;
/*
* keep the number of current result in computing,
* the value will be set to 0 before set timer for next computing
...
...
@@ -482,6 +486,8 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
void
*
param
,
TAOS
**
taos
);
TAOS_RES
*
taos_query_h
(
TAOS
*
taos
,
const
char
*
sqlstr
,
int64_t
*
res
);
TAOS_RES
*
taos_query_ra
(
TAOS
*
taos
,
const
char
*
sqlstr
,
__async_cb_func_t
fp
,
void
*
param
);
// get taos connection unused session number
int32_t
taos_unused_session
(
TAOS
*
taos
);
void
waitForQueryRsp
(
void
*
param
,
TAOS_RES
*
tres
,
int
code
);
...
...
src/client/src/taos.def
浏览文件 @
0b6fd5c2
...
...
@@ -50,3 +50,4 @@ taos_stmt_bind_param_batch
taos_stmt_bind_single_param_batch
taos_is_null
taos_insert_lines
taos_print_row_ex
\ No newline at end of file
src/client/src/tscSQLParser.c
浏览文件 @
0b6fd5c2
...
...
@@ -1350,6 +1350,7 @@ int32_t tscSetTableFullName(SName* pName, SStrToken* pTableName, SSqlObj* pSql)
const
char
*
msg3
=
"no acctId"
;
const
char
*
msg4
=
"db name too long"
;
const
char
*
msg5
=
"table name too long"
;
const
char
*
msg6
=
"table name empty"
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -1396,6 +1397,8 @@ int32_t tscSetTableFullName(SName* pName, SStrToken* pTableName, SSqlObj* pSql)
if
(
pTableName
->
n
>=
TSDB_TABLE_NAME_LEN
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg1
);
}
else
if
(
pTableName
->
n
==
0
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg6
);
}
char
name
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
...
...
@@ -7803,6 +7806,18 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
if
(
tscValidateName
(
pName
)
!=
TSDB_CODE_SUCCESS
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg1
);
}
// check to valid and create to name
if
(
pInfo
->
pCreateTableInfo
->
to
.
n
>
0
)
{
if
(
tscValidateName
(
&
pInfo
->
pCreateTableInfo
->
to
)
!=
TSDB_CODE_SUCCESS
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg1
);
}
int32_t
code
=
tscSetTableFullName
(
&
pInfo
->
pCreateTableInfo
->
toSName
,
&
pInfo
->
pCreateTableInfo
->
to
,
pSql
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
SRelationInfo
*
pFromInfo
=
pInfo
->
pCreateTableInfo
->
pSelect
->
from
;
if
(
pFromInfo
==
NULL
||
taosArrayGetSize
(
pFromInfo
->
list
)
==
0
)
{
...
...
src/client/src/tscServer.c
浏览文件 @
0b6fd5c2
...
...
@@ -1630,11 +1630,49 @@ int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
if
(
pCreateTableInfo
->
pSelect
!=
NULL
)
{
size
+=
(
pCreateTableInfo
->
pSelect
->
sqlstr
.
n
+
1
);
// add size = create super table with same columns and 1 tags
if
(
pCreateTableInfo
->
to
.
n
>
0
)
{
size
+=
sizeof
(
SCreateTableMsg
);
size
+=
sizeof
(
SSchema
)
*
(
pCmd
->
numOfCols
+
1
);
size
+=
pCreateTableInfo
->
to
.
n
+
4
;
// to:
if
(
pCreateTableInfo
->
split
.
n
>
0
)
size
+=
pCreateTableInfo
->
split
.
n
+
7
;
// split:
}
}
return
size
+
TSDB_EXTRA_PAYLOAD_SIZE
;
}
char
*
fillCreateSTableMsg
(
SCreateTableMsg
*
pCreateMsg
,
SCreateTableSql
*
pTableSql
,
SSqlCmd
*
pCmd
,
SSqlInfo
*
pInfo
)
{
// SET
SSchema
*
pSchema
=
(
SSchema
*
)
pCreateMsg
->
schema
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
);
pCreateMsg
->
igExists
=
0
;
pCreateMsg
->
sqlLen
=
0
;
// FullName like acctID.dbName.tableName
tNameExtractFullName
(
&
pInfo
->
pCreateTableInfo
->
toSName
,
pCreateMsg
->
tableName
);
// copy columns
pCreateMsg
->
numOfColumns
=
htons
(
pCmd
->
numOfCols
);
for
(
int
i
=
0
;
i
<
pCmd
->
numOfCols
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fieldsInfo
,
i
);
pSchema
->
type
=
pField
->
type
;
strcpy
(
pSchema
->
name
,
pField
->
name
);
pSchema
->
bytes
=
htons
(
pField
->
bytes
);
pSchema
++
;
}
// append one tag
pCreateMsg
->
numOfTags
=
htons
(
1
);
// only one tag immutable
pSchema
->
type
=
TSDB_DATA_TYPE_INT
;
pSchema
->
bytes
=
htons
(
INT_BYTES
);
strcpy
(
pSchema
->
name
,
"tag1"
);
pSchema
++
;
return
(
char
*
)
pSchema
;
}
int
tscBuildCreateTableMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
int
msgLen
=
0
;
int
size
=
0
;
...
...
@@ -1682,39 +1720,71 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCreate
->
len
=
htonl
(
len
);
}
}
else
{
// create (super) table
// FIRST MSG
SCreateTableMsg
*
pCreate
=
pCreateMsg
;
pCreateTableMsg
->
numOfTables
=
htonl
(
1
);
// only one table will be created
int32_t
code
=
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
pCreateMsg
->
tableName
)
;
int32_t
code
=
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
pCreate
->
tableName
);
bool
to
=
pInfo
->
pCreateTableInfo
->
to
.
n
>
0
;
assert
(
code
==
0
);
SCreateTableSql
*
pCreateTable
=
pInfo
->
pCreateTableInfo
;
pCreateMsg
->
igExists
=
pCreateTable
->
existCheck
?
1
:
0
;
pCreateMsg
->
numOfColumns
=
htons
(
pCmd
->
numOfCols
);
pCreateMsg
->
numOfTags
=
htons
(
pCmd
->
count
);
pCreateMsg
->
sqlLen
=
0
;
pMsg
=
(
char
*
)
pCreateMsg
->
schema
;
pSchema
=
(
SSchema
*
)
pCreateMsg
->
schema
;
pCreate
->
igExists
=
pCreateTable
->
existCheck
?
1
:
0
;
pCreate
->
numOfColumns
=
htons
(
pCmd
->
numOfCols
);
pCreate
->
numOfTags
=
htons
(
pCmd
->
count
);
pCreate
->
sqlLen
=
0
;
pSchema
=
(
SSchema
*
)
pCreate
->
schema
;
//copy schema
for
(
int
i
=
0
;
i
<
pCmd
->
numOfCols
+
pCmd
->
count
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fieldsInfo
,
i
);
pSchema
->
type
=
pField
->
type
;
strcpy
(
pSchema
->
name
,
pField
->
name
);
pSchema
->
bytes
=
htons
(
pField
->
bytes
);
pSchema
++
;
}
//copy stream sql if have
pMsg
=
(
char
*
)
pSchema
;
if
(
type
==
TSQL_CREATE_STREAM
)
{
// check if it is a stream sql
SSqlNode
*
pQuerySql
=
pInfo
->
pCreateTableInfo
->
pSelect
;
int16_t
len
=
0
;
if
(
to
)
{
//sql:
strcpy
(
pMsg
,
LABEL_SQL
);
len
+=
LABEL_SQL_LEN
;
len
+=
tStrNCpy
(
pMsg
+
len
,
&
pQuerySql
->
sqlstr
);
//to
strcpy
(
pMsg
+
len
,
LABEL_TO
);
len
+=
LABEL_TO_LEN
;
len
+=
tStrNCpy
(
pMsg
+
len
,
&
pInfo
->
pCreateTableInfo
->
to
);
//split if
if
(
pInfo
->
pCreateTableInfo
->
split
.
n
>
0
)
{
strcpy
(
pMsg
+
len
,
LABEL_SPLIT
);
len
+=
LABEL_SPLIT_LEN
;
len
+=
tStrNCpy
(
pMsg
+
len
,
&
pInfo
->
pCreateTableInfo
->
split
);
}
// append string end flag
pMsg
[
len
++
]
=
0
;
pMsg
+=
len
;
pCreate
->
sqlLen
=
htons
(
len
);
}
else
{
len
=
pQuerySql
->
sqlstr
.
n
;
strncpy
(
pMsg
,
pQuerySql
->
sqlstr
.
z
,
len
);
pMsg
[
len
++
]
=
0
;
// string end
pMsg
+=
len
;
pCreate
->
sqlLen
=
htons
(
len
);
}
}
// calc first msg length
int32_t
len
=
(
int32_t
)(
pMsg
-
(
char
*
)
pCreate
);
pCreate
->
len
=
htonl
(
len
);
strncpy
(
pMsg
,
pQuerySql
->
sqlstr
.
z
,
pQuerySql
->
sqlstr
.
n
+
1
);
pCreateMsg
->
sqlLen
=
htons
(
pQuerySql
->
sqlstr
.
n
+
1
);
pMsg
+=
pQuerySql
->
sqlstr
.
n
+
1
;
// filling second msg if to have value
if
(
to
)
{
pCreate
=
(
SCreateTableMsg
*
)
pMsg
;
pMsg
=
fillCreateSTableMsg
(
pCreate
,
pCreateTable
,
pCmd
,
pInfo
);
len
=
(
int32_t
)(
pMsg
-
(
char
*
)
pCreate
);
pCreate
->
len
=
htonl
(
len
);
pCreateTableMsg
->
numOfTables
=
htonl
(
2
);
}
}
...
...
src/client/src/tscSql.c
浏览文件 @
0b6fd5c2
...
...
@@ -28,6 +28,7 @@
#include "tutil.h"
#include "ttimer.h"
#include "tscProfile.h"
#include "tidpool.h"
static
bool
validImpl
(
const
char
*
str
,
size_t
maxsize
)
{
if
(
str
==
NULL
)
{
...
...
@@ -306,6 +307,25 @@ void taos_close(TAOS *taos) {
taosRemoveRef
(
tscRefId
,
pObj
->
rid
);
}
// get taos connection unused session number
int32_t
taos_unused_session
(
TAOS
*
taos
)
{
// param valid check
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
{
tscError
(
"pObj:%p is NULL or freed"
,
pObj
);
terrno
=
TSDB_CODE_TSC_DISCONNECTED
;
return
0
;
}
if
(
pObj
->
pRpcObj
==
NULL
)
{
tscError
(
"pObj:%p pRpcObj is NULL."
,
pObj
);
terrno
=
TSDB_CODE_TSC_DISCONNECTED
;
return
0
;
}
// get number
return
rpcUnusedSession
(
pObj
->
pRpcObj
->
pDnodeConn
,
false
);
}
void
waitForQueryRsp
(
void
*
param
,
TAOS_RES
*
tres
,
int
code
)
{
assert
(
tres
!=
NULL
);
...
...
@@ -772,11 +792,15 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
}
int
taos_print_row
(
char
*
str
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int
num_fields
)
{
return
taos_print_row_ex
(
str
,
row
,
fields
,
num_fields
,
' '
,
false
);
}
int
taos_print_row_ex
(
char
*
str
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int
num_fields
,
char
split
,
bool
addQuota
)
{
int
len
=
0
;
for
(
int
i
=
0
;
i
<
num_fields
;
++
i
)
{
if
(
i
>
0
)
{
str
[
len
++
]
=
' '
;
str
[
len
++
]
=
split
;
}
if
(
row
[
i
]
==
NULL
)
{
...
...
@@ -837,9 +861,23 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
}
else
{
assert
(
charLen
<=
fields
[
i
].
bytes
*
TSDB_NCHAR_SIZE
&&
charLen
>=
0
);
}
// add pre quotaion if require
if
(
addQuota
)
{
*
(
str
+
len
)
=
'\''
;
len
+=
1
;
}
// copy content
memcpy
(
str
+
len
,
row
[
i
],
charLen
);
len
+=
charLen
;
// add end quotaion if require
if
(
addQuota
)
{
*
(
str
+
len
)
=
'\''
;
len
+=
1
;
}
}
break
;
case
TSDB_DATA_TYPE_TIMESTAMP
:
...
...
@@ -856,6 +894,89 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
return
len
;
}
// print field value to str
int
taos_print_field
(
char
*
str
,
void
*
value
,
TAOS_FIELD
*
field
)
{
// check valid
if
(
str
==
NULL
||
value
==
NULL
||
field
==
NULL
)
{
return
0
;
}
// get value
int
len
=
0
;
switch
(
field
->
type
)
{
//
// fixed length
//
case
TSDB_DATA_TYPE_TINYINT
:
len
=
sprintf
(
str
,
"%d"
,
*
((
int8_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_UTINYINT
:
len
=
sprintf
(
str
,
"%u"
,
*
((
uint8_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
len
=
sprintf
(
str
,
"%d"
,
*
((
int16_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_USMALLINT
:
len
=
sprintf
(
str
,
"%u"
,
*
((
uint16_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_INT
:
len
=
sprintf
(
str
,
"%d"
,
*
((
int32_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_UINT
:
len
=
sprintf
(
str
,
"%u"
,
*
((
uint32_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_BIGINT
:
len
=
sprintf
(
str
,
"%"
PRId64
,
*
((
int64_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_UBIGINT
:
len
=
sprintf
(
str
,
"%"
PRIu64
,
*
((
uint64_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_FLOAT
:
{
float
fv
=
0
;
fv
=
GET_FLOAT_VAL
(
value
);
len
=
sprintf
(
str
,
"%f"
,
fv
);
}
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
{
double
dv
=
0
;
dv
=
GET_DOUBLE_VAL
(
value
);
len
=
sprintf
(
str
,
"%lf"
,
dv
);
}
break
;
case
TSDB_DATA_TYPE_TIMESTAMP
:
len
=
sprintf
(
str
,
"%"
PRId64
,
*
((
int64_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_BOOL
:
len
=
sprintf
(
str
,
"%d"
,
*
((
int8_t
*
)
value
));
//
// variant length
//
case
TSDB_DATA_TYPE_BINARY
:
case
TSDB_DATA_TYPE_NCHAR
:
{
len
=
varDataLen
((
char
*
)
value
-
VARSTR_HEADER_SIZE
);
if
(
field
->
type
==
TSDB_DATA_TYPE_BINARY
)
{
assert
(
len
<=
field
->
bytes
&&
len
>=
0
);
}
else
{
assert
(
len
<=
field
->
bytes
*
TSDB_NCHAR_SIZE
&&
len
>=
0
);
}
memcpy
(
str
,
value
,
len
);
}
break
;
default:
break
;
}
return
len
;
}
static
void
asyncCallback
(
void
*
param
,
TAOS_RES
*
tres
,
int
code
)
{
assert
(
param
!=
NULL
);
SSqlObj
*
pSql
=
((
SSqlObj
*
)
param
);
...
...
src/client/src/tscStream.c
浏览文件 @
0b6fd5c2
此差异已折叠。
点击以展开。
src/common/inc/tglobal.h
浏览文件 @
0b6fd5c2
...
...
@@ -78,7 +78,7 @@ extern int32_t tsMaxNumOfOrderedResults;
extern
int32_t
tsMinSlidingTime
;
extern
int32_t
tsMinIntervalTime
;
extern
int32_t
tsMaxStreamComputDelay
;
extern
int32_t
ts
StreamCompStart
Delay
;
extern
int32_t
ts
FirstLaunch
Delay
;
extern
int32_t
tsRetryStreamCompDelay
;
extern
float
tsStreamComputDelayRatio
;
// the delayed computing ration of the whole time window
extern
int32_t
tsProjectExecInterval
;
...
...
src/common/src/tglobal.c
浏览文件 @
0b6fd5c2
...
...
@@ -98,11 +98,11 @@ int32_t tsMinIntervalTime = 1;
// 20sec, the maximum value of stream computing delay, changed accordingly
int32_t
tsMaxStreamComputDelay
=
20000
;
// 10sec, the
first stream computing
delay time after system launched successfully, changed accordingly
int32_t
ts
StreamCompStart
Delay
=
10000
;
// 10sec, the
stream first launched to execute
delay time after system launched successfully, changed accordingly
int32_t
ts
FirstLaunch
Delay
=
10000
;
// the stream computing delay time after executing failed, change accordingly
int32_t
tsRetryStreamCompDelay
=
1
0
*
1000
;
int32_t
tsRetryStreamCompDelay
=
30
*
6
0
*
1000
;
// The delayed computing ration. 10% of the whole computing time window by default.
float
tsStreamComputDelayRatio
=
0
.
1
f
;
...
...
@@ -755,7 +755,7 @@ static void doInitGlobalConfig(void) {
taosInitConfigOption
(
cfg
);
cfg
.
option
=
"maxFirstStreamCompDelay"
;
cfg
.
ptr
=
&
ts
StreamCompStartDelay
;
cfg
.
ptr
=
&
ts
FirstLaunchDelay
;
// stream first launch delay time
cfg
.
valType
=
TAOS_CFG_VTYPE_INT32
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
1000
;
...
...
src/cq/src/cqMain.c
浏览文件 @
0b6fd5c2
...
...
@@ -423,8 +423,8 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
}
// inner implement in tscStream.c
TAOS_STREAM
*
taos_open_stream_withname
(
TAOS
*
taos
,
const
char
*
desName
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
int64_t
tsc_
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
),
void
*
cqhandle
);
TAOS_STREAM
*
taos_open_stream_withname
(
TAOS
*
taos
,
const
char
*
desName
,
int32_t
dstCols
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
int64_t
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
),
void
*
cqhandle
);
static
void
cqCreateStream
(
SCqContext
*
pContext
,
SCqObj
*
pObj
)
{
pObj
->
pContext
=
pContext
;
...
...
@@ -436,9 +436,11 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
}
pObj
->
tmrId
=
0
;
int32_t
dstCols
=
-
1
;
if
(
pObj
->
pSchema
)
dstCols
=
pObj
->
pSchema
->
numOfCols
;
if
(
pObj
->
pStream
==
NULL
)
{
pObj
->
pStream
=
taos_open_stream_withname
(
pContext
->
dbConn
,
pObj
->
dstTable
,
pObj
->
sqlStr
,
cqProcessStreamRes
,
\
pObj
->
pStream
=
taos_open_stream_withname
(
pContext
->
dbConn
,
pObj
->
dstTable
,
dstCols
,
pObj
->
sqlStr
,
cqProcessStreamRes
,
\
INT64_MIN
,
(
void
*
)
pObj
->
rid
,
NULL
,
pContext
);
// TODO the pObj->pStream may be released if error happens
...
...
src/inc/taos.h
浏览文件 @
0b6fd5c2
...
...
@@ -137,7 +137,11 @@ DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT
int
taos_affected_rows
(
TAOS_RES
*
res
);
DLL_EXPORT
TAOS_FIELD
*
taos_fetch_fields
(
TAOS_RES
*
res
);
DLL_EXPORT
int
taos_select_db
(
TAOS
*
taos
,
const
char
*
db
);
// row to string
DLL_EXPORT
int
taos_print_row
(
char
*
str
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int
num_fields
);
DLL_EXPORT
int
taos_print_row_ex
(
char
*
str
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int
num_fields
,
char
split
,
bool
addQuota
);
// one field to string
DLL_EXPORT
int
taos_print_field
(
char
*
str
,
void
*
value
,
TAOS_FIELD
*
field
);
DLL_EXPORT
void
taos_stop_query
(
TAOS_RES
*
res
);
DLL_EXPORT
bool
taos_is_null
(
TAOS_RES
*
res
,
int32_t
row
,
int32_t
col
);
DLL_EXPORT
int
taos_fetch_block
(
TAOS_RES
*
res
,
TAOS_ROW
*
rows
);
...
...
@@ -165,7 +169,7 @@ DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub);
DLL_EXPORT
void
taos_unsubscribe
(
TAOS_SUB
*
tsub
,
int
keepProgress
);
DLL_EXPORT
TAOS_STREAM
*
taos_open_stream
(
TAOS
*
taos
,
const
char
*
sql
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
int64_t
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
));
int64_t
tsc_
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
));
DLL_EXPORT
void
taos_close_stream
(
TAOS_STREAM
*
tstr
);
DLL_EXPORT
int
taos_load_table_info
(
TAOS
*
taos
,
const
char
*
tableNameList
);
...
...
src/inc/trpc.h
浏览文件 @
0b6fd5c2
...
...
@@ -92,6 +92,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void
rpcSendRecv
(
void
*
shandle
,
SRpcEpSet
*
pEpSet
,
SRpcMsg
*
pReq
,
SRpcMsg
*
pRsp
);
int
rpcReportProgress
(
void
*
pConn
,
char
*
pCont
,
int
contLen
);
void
rpcCancelRequest
(
int64_t
rid
);
int32_t
rpcUnusedSession
(
void
*
rpcInfo
,
bool
bLock
);
#ifdef __cplusplus
}
...
...
src/inc/ttokendef.h
浏览文件 @
0b6fd5c2
...
...
@@ -133,86 +133,87 @@
#define TK_UNSIGNED 115
#define TK_TAGS 116
#define TK_USING 117
#define TK_NULL 118
#define TK_NOW 119
#define TK_VARIABLE 120
#define TK_SELECT 121
#define TK_UNION 122
#define TK_ALL 123
#define TK_DISTINCT 124
#define TK_FROM 125
#define TK_INTERVAL 126
#define TK_EVERY 127
#define TK_SESSION 128
#define TK_STATE_WINDOW 129
#define TK_FILL 130
#define TK_SLIDING 131
#define TK_ORDER 132
#define TK_BY 133
#define TK_ASC 134
#define TK_GROUP 135
#define TK_HAVING 136
#define TK_LIMIT 137
#define TK_OFFSET 138
#define TK_SLIMIT 139
#define TK_SOFFSET 140
#define TK_WHERE 141
#define TK_RESET 142
#define TK_QUERY 143
#define TK_SYNCDB 144
#define TK_ADD 145
#define TK_COLUMN 146
#define TK_MODIFY 147
#define TK_TAG 148
#define TK_CHANGE 149
#define TK_SET 150
#define TK_KILL 151
#define TK_CONNECTION 152
#define TK_STREAM 153
#define TK_COLON 154
#define TK_ABORT 155
#define TK_AFTER 156
#define TK_ATTACH 157
#define TK_BEFORE 158
#define TK_BEGIN 159
#define TK_CASCADE 160
#define TK_CLUSTER 161
#define TK_CONFLICT 162
#define TK_COPY 163
#define TK_DEFERRED 164
#define TK_DELIMITERS 165
#define TK_DETACH 166
#define TK_EACH 167
#define TK_END 168
#define TK_EXPLAIN 169
#define TK_FAIL 170
#define TK_FOR 171
#define TK_IGNORE 172
#define TK_IMMEDIATE 173
#define TK_INITIALLY 174
#define TK_INSTEAD 175
#define TK_MATCH 176
#define TK_KEY 177
#define TK_OF 178
#define TK_RAISE 179
#define TK_REPLACE 180
#define TK_RESTRICT 181
#define TK_ROW 182
#define TK_STATEMENT 183
#define TK_TRIGGER 184
#define TK_VIEW 185
#define TK_SEMI 186
#define TK_NONE 187
#define TK_PREV 188
#define TK_LINEAR 189
#define TK_IMPORT 190
#define TK_TBNAME 191
#define TK_JOIN 192
#define TK_INSERT 193
#define TK_INTO 194
#define TK_VALUES 195
#define TK_FILE 196
#define TK_TO 118
#define TK_SPLIT 119
#define TK_NULL 120
#define TK_NOW 121
#define TK_VARIABLE 122
#define TK_SELECT 123
#define TK_UNION 124
#define TK_ALL 125
#define TK_DISTINCT 126
#define TK_FROM 127
#define TK_INTERVAL 128
#define TK_EVERY 129
#define TK_SESSION 130
#define TK_STATE_WINDOW 131
#define TK_FILL 132
#define TK_SLIDING 133
#define TK_ORDER 134
#define TK_BY 135
#define TK_ASC 136
#define TK_GROUP 137
#define TK_HAVING 138
#define TK_LIMIT 139
#define TK_OFFSET 140
#define TK_SLIMIT 141
#define TK_SOFFSET 142
#define TK_WHERE 143
#define TK_RESET 144
#define TK_QUERY 145
#define TK_SYNCDB 146
#define TK_ADD 147
#define TK_COLUMN 148
#define TK_MODIFY 149
#define TK_TAG 150
#define TK_CHANGE 151
#define TK_SET 152
#define TK_KILL 153
#define TK_CONNECTION 154
#define TK_STREAM 155
#define TK_COLON 156
#define TK_ABORT 157
#define TK_AFTER 158
#define TK_ATTACH 159
#define TK_BEFORE 160
#define TK_BEGIN 161
#define TK_CASCADE 162
#define TK_CLUSTER 163
#define TK_CONFLICT 164
#define TK_COPY 165
#define TK_DEFERRED 166
#define TK_DELIMITERS 167
#define TK_DETACH 168
#define TK_EACH 169
#define TK_END 170
#define TK_EXPLAIN 171
#define TK_FAIL 172
#define TK_FOR 173
#define TK_IGNORE 174
#define TK_IMMEDIATE 175
#define TK_INITIALLY 176
#define TK_INSTEAD 177
#define TK_MATCH 178
#define TK_KEY 179
#define TK_OF 180
#define TK_RAISE 181
#define TK_REPLACE 182
#define TK_RESTRICT 183
#define TK_ROW 184
#define TK_STATEMENT 185
#define TK_TRIGGER 186
#define TK_VIEW 187
#define TK_SEMI 188
#define TK_NONE 189
#define TK_PREV 190
#define TK_LINEAR 191
#define TK_IMPORT 192
#define TK_TBNAME 193
#define TK_JOIN 194
#define TK_INSERT 195
#define TK_INTO 196
#define TK_VALUES 197
#define TK_FILE 198
#define TK_SPACE 300
#define TK_COMMENT 301
...
...
src/mnode/src/mnodeTable.c
浏览文件 @
0b6fd5c2
...
...
@@ -845,15 +845,15 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
memcpy
(
pSubMsg
->
pCont
+
sizeof
(
SCMCreateTableMsg
),
p
,
htonl
(
p
->
len
));
code
=
mnodeValidateCreateTableMsg
(
p
,
pSubMsg
);
if
(
code
==
TSDB_CODE_SUCCESS
||
code
==
TSDB_CODE_MND_TABLE_ALREADY_EXIST
)
{
++
pSubMsg
->
pBatchMasterMsg
->
successed
;
mnodeDestroySubMsg
(
pSubMsg
);
continue
;
if
(
code
==
TSDB_CODE_SUCCESS
||
(
p
->
igExists
==
1
&&
code
==
TSDB_CODE_MND_TABLE_ALREADY_EXIST
)
)
{
++
pSubMsg
->
pBatchMasterMsg
->
successed
;
mnodeDestroySubMsg
(
pSubMsg
);
continue
;
}
if
(
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mnodeDestroySubMsg
(
pSubMsg
);
return
code
;
mnodeDestroySubMsg
(
pSubMsg
);
return
code
;
}
}
...
...
@@ -1045,11 +1045,21 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
if
(
code
==
TSDB_CODE_SUCCESS
)
{
mLInfo
(
"stable:%s, is created in sdb, uid:%"
PRIu64
,
pTable
->
info
.
tableId
,
pTable
->
uid
);
if
(
pMsg
->
pBatchMasterMsg
)
pMsg
->
pBatchMasterMsg
->
successed
++
;
}
else
{
mError
(
"msg:%p, app:%p stable:%s, failed to create in sdb, reason:%s"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
pTable
->
info
.
tableId
,
tstrerror
(
code
));
SSdbRow
desc
=
{.
type
=
SDB_OPER_GLOBAL
,
.
pObj
=
pTable
,
.
pTable
=
tsSuperTableSdb
};
sdbDeleteRow
(
&
desc
);
if
(
pMsg
->
pBatchMasterMsg
)
pMsg
->
pBatchMasterMsg
->
received
++
;
}
// if super table create by batch msg, check done and send finished to client
if
(
pMsg
->
pBatchMasterMsg
)
{
if
(
pMsg
->
pBatchMasterMsg
->
successed
+
pMsg
->
pBatchMasterMsg
->
received
>=
pMsg
->
pBatchMasterMsg
->
expected
)
dnodeSendRpcMWriteRsp
(
pMsg
->
pBatchMasterMsg
,
code
);
}
return
code
;
...
...
src/query/inc/qSqlparser.h
浏览文件 @
0b6fd5c2
...
...
@@ -25,6 +25,7 @@ extern "C" {
#include "tstrbuild.h"
#include "ttoken.h"
#include "tvariant.h"
#include "tname.h"
#define ParseTOKENTYPE SStrToken
...
...
@@ -139,8 +140,11 @@ typedef struct SCreatedTableInfo {
typedef
struct
SCreateTableSql
{
SStrToken
name
;
// table name, create table [name] xxx
SStrToken
to
;
// create stream to anohter table
SStrToken
split
;
// split columns
int8_t
type
;
// create normal table/from super table/ stream
bool
existCheck
;
SName
toSName
;
struct
{
SArray
*
pTagColumns
;
// SArray<TAOS_FIELD>
...
...
@@ -313,6 +317,7 @@ SArray *setSubclause(SArray *pList, void *pSqlNode);
SArray
*
appendSelectClause
(
SArray
*
pList
,
void
*
pSubclause
);
void
setCreatedTableName
(
SSqlInfo
*
pInfo
,
SStrToken
*
pTableNameToken
,
SStrToken
*
pIfNotExists
);
void
setCreatedStreamOpt
(
SSqlInfo
*
pInfo
,
SStrToken
*
pTo
,
SStrToken
*
pSplit
);
void
SqlInfoDestroy
(
SSqlInfo
*
pInfo
);
...
...
src/query/inc/sql.y
浏览文件 @
0b6fd5c2
...
...
@@ -413,14 +413,28 @@ tagNamelist(A) ::= ids(X). {A = taosArrayInit(4, sizeof(SSt
// create stream
// create table table_name as select count(*) from super_table_name interval(time)
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) AS select(S). {
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z)
to_opt(E) split_opt(F)
AS select(S). {
A = tSetCreateTableInfo(NULL, NULL, S, TSQL_CREATE_STREAM);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
setCreatedStreamOpt(pInfo, &E, &F);
V.n += Z.n;
setCreatedTableName(pInfo, &V, &U);
}
// to_opt
%type to_opt {SStrToken}
to_opt(A) ::= . {A.n = 0;}
to_opt(A) ::= TO ids(X) cpxName(Y). {
A = X;
A.n += Y.n;
}
// split_opt
%type to_split {SStrToken}
split_opt(A) ::= . {A.n = 0;}
split_opt(A) ::= SPLIT ids(X). { A = X;}
%type column{TAOS_FIELD}
%type columnlist{SArray*}
%destructor columnlist {taosArrayDestroy($$);}
...
...
src/query/src/qSqlParser.c
浏览文件 @
0b6fd5c2
...
...
@@ -1009,6 +1009,11 @@ void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken
pInfo
->
pCreateTableInfo
->
existCheck
=
(
pIfNotExists
->
n
!=
0
);
}
void
setCreatedStreamOpt
(
SSqlInfo
*
pInfo
,
SStrToken
*
pTo
,
SStrToken
*
pSplit
)
{
pInfo
->
pCreateTableInfo
->
to
=
*
pTo
;
pInfo
->
pCreateTableInfo
->
split
=
*
pSplit
;
}
void
setDCLSqlElems
(
SSqlInfo
*
pInfo
,
int32_t
type
,
int32_t
nParam
,
...)
{
pInfo
->
type
=
type
;
if
(
nParam
==
0
)
{
...
...
src/query/src/queryMain.c
浏览文件 @
0b6fd5c2
...
...
@@ -220,6 +220,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
if
(
isQueryKilled
(
pQInfo
))
{
qDebug
(
"QInfo:0x%"
PRIx64
" it is already killed, abort"
,
pQInfo
->
qId
);
setQueryKilled
(
pQInfo
);
pQInfo
->
runtimeEnv
.
outputBuf
=
NULL
;
return
doBuildResCheck
(
pQInfo
);
}
...
...
@@ -692,4 +693,4 @@ bool qSolveCommitNoBlock(void* pRepo, void* pMgmt) {
}
qWarn
(
"pRepo=%p solve problem failed."
,
pRepo
);
return
false
;
}
\ No newline at end of file
}
src/query/src/sql.c
浏览文件 @
0b6fd5c2
此差异已折叠。
点击以展开。
src/rpc/src/rpcMain.c
浏览文件 @
0b6fd5c2
...
...
@@ -1669,3 +1669,9 @@ static void rpcDecRef(SRpcInfo *pRpc)
}
}
int32_t
rpcUnusedSession
(
void
*
rpcInfo
,
bool
bLock
)
{
SRpcInfo
*
info
=
(
SRpcInfo
*
)
rpcInfo
;
if
(
info
==
NULL
)
return
0
;
return
taosIdPoolNumOfFree
(
info
->
idPool
,
bLock
);
}
\ No newline at end of file
src/util/inc/tidpool.h
浏览文件 @
0b6fd5c2
...
...
@@ -36,6 +36,8 @@ int taosIdPoolNumOfUsed(void *handle);
bool
taosIdPoolMarkStatus
(
void
*
handle
,
int
id
);
// get free count from pool , if bLock is true, locked pool than get free count, accuracy but slowly
int
taosIdPoolNumOfFree
(
void
*
handle
,
bool
bLock
);
#ifdef __cplusplus
}
#endif
...
...
src/util/inc/ttoken.h
浏览文件 @
0b6fd5c2
...
...
@@ -63,6 +63,15 @@ uint32_t tGetToken(char *z, uint32_t *tokenType);
*/
SStrToken
tStrGetToken
(
char
*
str
,
int32_t
*
i
,
bool
isPrevOptr
);
/**
* strcpy implement source from SStrToken
*
* @param dst copy to
* @param srcToken copy from
* @return size of copy successful bytes, not include '\0'
*/
int32_t
tStrNCpy
(
char
*
dst
,
SStrToken
*
srcToken
);
/**
* check if it is a keyword or not
* @param z
...
...
src/util/src/tidpool.c
浏览文件 @
0b6fd5c2
...
...
@@ -163,4 +163,16 @@ int taosIdPoolMaxSize(void *handle) {
pthread_mutex_unlock
(
&
pIdPool
->
mutex
);
return
ret
;
}
\ No newline at end of file
}
// get free count from pool , if bLock is true, locked pool than get free count, accuracy but slowly
int
taosIdPoolNumOfFree
(
void
*
handle
,
bool
bLock
)
{
id_pool_t
*
pIdPool
=
handle
;
if
(
bLock
)
pthread_mutex_lock
(
&
pIdPool
->
mutex
);
int
ret
=
pIdPool
->
numOfFree
;
if
(
bLock
)
pthread_mutex_unlock
(
&
pIdPool
->
mutex
);
return
ret
;
}
src/util/src/ttokenizer.c
浏览文件 @
0b6fd5c2
...
...
@@ -227,6 +227,8 @@ static SKeyword keywordTable[] = {
{
"OUTPUTTYPE"
,
TK_OUTPUTTYPE
},
{
"AGGREGATE"
,
TK_AGGREGATE
},
{
"BUFSIZE"
,
TK_BUFSIZE
},
{
"TO"
,
TK_TO
},
{
"SPLIT"
,
TK_SPLIT
},
};
static
const
char
isIdChar
[]
=
{
...
...
@@ -676,6 +678,18 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr) {
return
t0
;
}
/**
* strcpy implement source from SStrToken
*
* @param dst copy to
* @param srcToken copy from
* @return size of copy successful bytes
*/
int32_t
tStrNCpy
(
char
*
dst
,
SStrToken
*
srcToken
)
{
strncpy
(
dst
,
srcToken
->
z
,
srcToken
->
n
);
return
srcToken
->
n
;
}
bool
taosIsKeyWordToken
(
const
char
*
z
,
int32_t
len
)
{
return
(
tKeywordCode
((
char
*
)
z
,
len
)
!=
TK_ID
);
}
...
...
tests/pytest/stream/metric_1.py
浏览文件 @
0b6fd5c2
...
...
@@ -39,6 +39,7 @@ class TDTestCase:
def
run
(
self
):
tbNum
=
10
rowNum
=
20
ts_begin
=
1633017600000
tdSql
.
prepare
()
...
...
@@ -49,8 +50,8 @@ class TDTestCase:
tdSql
.
execute
(
"create table tb%d using stb tags(%d)"
%
(
i
,
i
))
for
j
in
range
(
rowNum
):
tdSql
.
execute
(
"insert into tb%d values (
now - %dm
, %d, %d)"
%
(
i
,
1440
-
j
,
j
,
j
))
"insert into tb%d values (
%d
, %d, %d)"
%
(
i
,
ts_begin
+
j
,
j
,
j
))
time
.
sleep
(
0.1
)
self
.
createFuncStream
(
"count(*)"
,
"c1"
,
200
)
...
...
tests/pytest/stream/new.py
浏览文件 @
0b6fd5c2
...
...
@@ -27,13 +27,14 @@ class TDTestCase:
def
run
(
self
):
rowNum
=
200
tdSql
.
prepare
()
ts_now
=
1633017600000
tdLog
.
info
(
"=============== step1"
)
tdSql
.
execute
(
"create table mt(ts timestamp, tbcol int, tbcol2 float) TAGS(tgcol int)"
)
for
i
in
range
(
5
):
tdSql
.
execute
(
"create table tb%d using mt tags(%d)"
%
(
i
,
i
))
for
j
in
range
(
rowNum
):
tdSql
.
execute
(
"insert into tb%d values(now + %ds, %d, %d)"
%
(
i
,
j
,
j
,
j
))
tdSql
.
execute
(
"insert into tb%d values(%d, %d, %d)"
%
(
i
,
ts_now
,
j
,
j
))
ts_now
+=
1000
time
.
sleep
(
0.1
)
tdLog
.
info
(
"=============== step2"
)
...
...
@@ -45,14 +46,15 @@ class TDTestCase:
tdSql
.
waitedQuery
(
"select * from st"
,
1
,
180
)
delay
=
int
(
time
.
time
()
-
start
)
+
80
v
=
tdSql
.
getData
(
0
,
3
)
if
v
>=
51
:
tdLog
.
exit
(
"value is %d,
which is larger than 51
"
%
v
)
if
v
!=
10
:
tdLog
.
exit
(
"value is %d,
expect is 10.
"
%
v
)
tdLog
.
info
(
"=============== step4"
)
for
i
in
range
(
5
,
10
):
tdSql
.
execute
(
"create table tb%d using mt tags(%d)"
%
(
i
,
i
))
for
j
in
range
(
rowNum
):
tdSql
.
execute
(
"insert into tb%d values(now + %ds, %d, %d)"
%
(
i
,
j
,
j
,
j
))
tdSql
.
execute
(
"insert into tb%d values(%d, %d, %d)"
%
(
i
,
ts_now
,
j
,
j
))
ts_now
+=
1000
tdLog
.
info
(
"=============== step5"
)
maxValue
=
0
...
...
@@ -62,11 +64,11 @@ class TDTestCase:
v
=
tdSql
.
getData
(
0
,
3
)
if
v
>
maxValue
:
maxValue
=
v
if
v
>
51
:
if
v
>
=
10
:
break
if
maxValue
<
=
51
:
tdLog
.
exit
(
"value is %d,
which is smaller than 51
"
%
maxValue
)
if
maxValue
<
10
:
tdLog
.
exit
(
"value is %d,
expect is 10
"
%
maxValue
)
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/pytest/stream/sys.py
浏览文件 @
0b6fd5c2
...
...
@@ -47,7 +47,7 @@ class TDTestCase:
"select * from iostrm"
,
]
for
sql
in
sqls
:
(
rows
,
_
)
=
tdSql
.
waitedQuery
(
sql
,
1
,
24
0
)
(
rows
,
_
)
=
tdSql
.
waitedQuery
(
sql
,
1
,
60
0
)
if
rows
<
1
:
tdLog
.
exit
(
"failed: sql:%s, expect at least one row"
%
sql
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录