Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8b56d495
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8b56d495
编写于
11月 12, 2021
作者:
A
Alex Duan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fixed TS-530 jingdong . support CQ write to super table
上级
693aa8f3
变更
17
展开全部
显示空白变更内容
内联
并排
Showing
17 changed file
with
1897 addition
and
1439 deletion
+1897
-1439
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+8
-0
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+4
-0
src/client/src/taos.def
src/client/src/taos.def
+1
-0
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+14
-0
src/client/src/tscServer.c
src/client/src/tscServer.c
+87
-17
src/client/src/tscSql.c
src/client/src/tscSql.c
+19
-1
src/client/src/tscStream.c
src/client/src/tscStream.c
+257
-7
src/cq/src/cqMain.c
src/cq/src/cqMain.c
+5
-3
src/inc/taos.h
src/inc/taos.h
+1
-0
src/inc/ttokendef.h
src/inc/ttokendef.h
+81
-79
src/mnode/src/mnodeTable.c
src/mnode/src/mnodeTable.c
+16
-6
src/query/inc/qSqlparser.h
src/query/inc/qSqlparser.h
+5
-0
src/query/inc/sql.y
src/query/inc/sql.y
+15
-1
src/query/src/qSqlParser.c
src/query/src/qSqlParser.c
+5
-0
src/query/src/sql.c
src/query/src/sql.c
+1356
-1325
src/util/inc/ttoken.h
src/util/inc/ttoken.h
+9
-0
src/util/src/ttokenizer.c
src/util/src/ttokenizer.c
+14
-0
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
8b56d495
...
...
@@ -29,6 +29,14 @@ extern "C" {
#include "tsched.h"
#include "tsclient.h"
#define LABEL_SQL "sql:"
#define LABEL_TO " to:"
#define LABEL_SPLIT " split:"
#define LABEL_SQL_LEN (sizeof(LABEL_SQL) - 1)
#define LABEL_TO_LEN (sizeof(LABEL_TO) - 1)
#define LABEL_SPLIT_LEN (sizeof(LABEL_SPLIT) - 1)
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
...
...
src/client/inc/tsclient.h
浏览文件 @
8b56d495
...
...
@@ -406,6 +406,10 @@ typedef struct SSqlStream {
int16_t
precision
;
int64_t
num
;
// number of computing count
int32_t
dstCols
;
// dstTable has number of columns
char
*
to
;
char
*
split
;
/*
* keep the number of current result in computing,
* the value will be set to 0 before set timer for next computing
...
...
src/client/src/taos.def
浏览文件 @
8b56d495
...
...
@@ -50,3 +50,4 @@ taos_stmt_bind_param_batch
taos_stmt_bind_single_param_batch
taos_is_null
taos_insert_lines
taos_print_row_ex
\ No newline at end of file
src/client/src/tscSQLParser.c
浏览文件 @
8b56d495
...
...
@@ -1341,6 +1341,7 @@ int32_t tscSetTableFullName(SName* pName, SStrToken* pTableName, SSqlObj* pSql)
const
char
*
msg3
=
"no acctId"
;
const
char
*
msg4
=
"db name too long"
;
const
char
*
msg5
=
"table name too long"
;
const
char
*
msg6
=
"table name empty"
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -1387,6 +1388,8 @@ int32_t tscSetTableFullName(SName* pName, SStrToken* pTableName, SSqlObj* pSql)
if
(
pTableName
->
n
>=
TSDB_TABLE_NAME_LEN
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg1
);
}
else
if
(
pTableName
->
n
==
0
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg6
);
}
char
name
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
...
...
@@ -7740,6 +7743,17 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg1
);
}
// check to valid and create to name
if
(
pInfo
->
pCreateTableInfo
->
to
.
n
>
0
)
{
if
(
tscValidateName
(
&
pInfo
->
pCreateTableInfo
->
to
)
!=
TSDB_CODE_SUCCESS
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg1
);
}
int32_t
code
=
tscSetTableFullName
(
&
pInfo
->
pCreateTableInfo
->
toSName
,
&
pInfo
->
pCreateTableInfo
->
to
,
pSql
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
SRelationInfo
*
pFromInfo
=
pInfo
->
pCreateTableInfo
->
pSelect
->
from
;
if
(
pFromInfo
==
NULL
||
taosArrayGetSize
(
pFromInfo
->
list
)
==
0
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg6
);
...
...
src/client/src/tscServer.c
浏览文件 @
8b56d495
...
...
@@ -1626,11 +1626,49 @@ int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
if
(
pCreateTableInfo
->
pSelect
!=
NULL
)
{
size
+=
(
pCreateTableInfo
->
pSelect
->
sqlstr
.
n
+
1
);
// add size = create super table with same columns and 1 tags
if
(
pCreateTableInfo
->
to
.
n
>
0
)
{
size
+=
sizeof
(
SCreateTableMsg
);
size
+=
sizeof
(
SSchema
)
*
(
pCmd
->
numOfCols
+
1
);
size
+=
pCreateTableInfo
->
to
.
n
+
4
;
// to:
if
(
pCreateTableInfo
->
split
.
n
>
0
)
size
+=
pCreateTableInfo
->
split
.
n
+
7
;
// split:
}
}
return
size
+
TSDB_EXTRA_PAYLOAD_SIZE
;
}
char
*
fillCreateSTableMsg
(
SCreateTableMsg
*
pCreateMsg
,
SCreateTableSql
*
pTableSql
,
SSqlCmd
*
pCmd
,
SSqlInfo
*
pInfo
)
{
// SET
SSchema
*
pSchema
=
(
SSchema
*
)
pCreateMsg
->
schema
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
);
pCreateMsg
->
igExists
=
0
;
pCreateMsg
->
sqlLen
=
0
;
// FullName like acctID.dbName.tableName
tNameExtractFullName
(
&
pInfo
->
pCreateTableInfo
->
toSName
,
pCreateMsg
->
tableName
);
// copy columns
pCreateMsg
->
numOfColumns
=
htons
(
pCmd
->
numOfCols
);
for
(
int
i
=
0
;
i
<
pCmd
->
numOfCols
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fieldsInfo
,
i
);
pSchema
->
type
=
pField
->
type
;
strcpy
(
pSchema
->
name
,
pField
->
name
);
pSchema
->
bytes
=
htons
(
pField
->
bytes
);
pSchema
++
;
}
// append one tag
pCreateMsg
->
numOfTags
=
htons
(
1
);
// only one tag immutable
pSchema
->
type
=
TSDB_DATA_TYPE_INT
;
pSchema
->
bytes
=
htons
(
INT_BYTES
);
strcpy
(
pSchema
->
name
,
"tag1"
);
pSchema
++
;
return
(
char
*
)
pSchema
;
}
int
tscBuildCreateTableMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
int
msgLen
=
0
;
int
size
=
0
;
...
...
@@ -1678,39 +1716,71 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCreate
->
len
=
htonl
(
len
);
}
}
else
{
// create (super) table
// FIRST MSG
SCreateTableMsg
*
pCreate
=
pCreateMsg
;
pCreateTableMsg
->
numOfTables
=
htonl
(
1
);
// only one table will be created
int32_t
code
=
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
pCreateMsg
->
tableName
)
;
int32_t
code
=
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
pCreate
->
tableName
);
bool
to
=
pInfo
->
pCreateTableInfo
->
to
.
n
>
0
;
assert
(
code
==
0
);
SCreateTableSql
*
pCreateTable
=
pInfo
->
pCreateTableInfo
;
pCreateMsg
->
igExists
=
pCreateTable
->
existCheck
?
1
:
0
;
pCreateMsg
->
numOfColumns
=
htons
(
pCmd
->
numOfCols
);
pCreateMsg
->
numOfTags
=
htons
(
pCmd
->
count
);
pCreateMsg
->
sqlLen
=
0
;
pMsg
=
(
char
*
)
pCreateMsg
->
schema
;
pSchema
=
(
SSchema
*
)
pCreateMsg
->
schema
;
pCreate
->
igExists
=
pCreateTable
->
existCheck
?
1
:
0
;
pCreate
->
numOfColumns
=
htons
(
pCmd
->
numOfCols
);
pCreate
->
numOfTags
=
htons
(
pCmd
->
count
);
pCreate
->
sqlLen
=
0
;
pSchema
=
(
SSchema
*
)
pCreate
->
schema
;
//copy schema
for
(
int
i
=
0
;
i
<
pCmd
->
numOfCols
+
pCmd
->
count
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fieldsInfo
,
i
);
pSchema
->
type
=
pField
->
type
;
strcpy
(
pSchema
->
name
,
pField
->
name
);
pSchema
->
bytes
=
htons
(
pField
->
bytes
);
pSchema
++
;
}
//copy stream sql if have
pMsg
=
(
char
*
)
pSchema
;
if
(
type
==
TSQL_CREATE_STREAM
)
{
// check if it is a stream sql
SSqlNode
*
pQuerySql
=
pInfo
->
pCreateTableInfo
->
pSelect
;
int16_t
len
=
0
;
if
(
to
)
{
//sql:
strcpy
(
pMsg
,
LABEL_SQL
);
len
+=
LABEL_SQL_LEN
;
len
+=
tStrNCpy
(
pMsg
+
len
,
&
pQuerySql
->
sqlstr
);
//to
strcpy
(
pMsg
+
len
,
LABEL_TO
);
len
+=
LABEL_TO_LEN
;
len
+=
tStrNCpy
(
pMsg
+
len
,
&
pInfo
->
pCreateTableInfo
->
to
);
//split if
if
(
pInfo
->
pCreateTableInfo
->
split
.
n
>
0
)
{
strcpy
(
pMsg
+
len
,
LABEL_SPLIT
);
len
+=
LABEL_SPLIT_LEN
;
len
+=
tStrNCpy
(
pMsg
+
len
,
&
pInfo
->
pCreateTableInfo
->
split
);
}
// append string end flag
pMsg
[
len
++
]
=
0
;
pMsg
+=
len
;
pCreate
->
sqlLen
=
htons
(
len
);
}
else
{
len
=
pQuerySql
->
sqlstr
.
n
;
strncpy
(
pMsg
,
pQuerySql
->
sqlstr
.
z
,
len
);
pMsg
[
len
++
]
=
0
;
// string end
pMsg
+=
len
;
pCreate
->
sqlLen
=
htons
(
len
);
}
}
// calc first msg length
int32_t
len
=
(
int32_t
)(
pMsg
-
(
char
*
)
pCreate
);
pCreate
->
len
=
htonl
(
len
);
strncpy
(
pMsg
,
pQuerySql
->
sqlstr
.
z
,
pQuerySql
->
sqlstr
.
n
+
1
);
pCreateMsg
->
sqlLen
=
htons
(
pQuerySql
->
sqlstr
.
n
+
1
);
pMsg
+=
pQuerySql
->
sqlstr
.
n
+
1
;
// filling second msg if to have value
if
(
to
)
{
pCreate
=
(
SCreateTableMsg
*
)
pMsg
;
pMsg
=
fillCreateSTableMsg
(
pCreate
,
pCreateTable
,
pCmd
,
pInfo
);
len
=
(
int32_t
)(
pMsg
-
(
char
*
)
pCreate
);
pCreate
->
len
=
htonl
(
len
);
pCreateTableMsg
->
numOfTables
=
htonl
(
2
);
}
}
...
...
src/client/src/tscSql.c
浏览文件 @
8b56d495
...
...
@@ -772,11 +772,15 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
}
int
taos_print_row
(
char
*
str
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int
num_fields
)
{
return
taos_print_row_ex
(
str
,
row
,
fields
,
num_fields
,
' '
,
false
);
}
int
taos_print_row_ex
(
char
*
str
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int
num_fields
,
char
split
,
bool
addQuota
)
{
int
len
=
0
;
for
(
int
i
=
0
;
i
<
num_fields
;
++
i
)
{
if
(
i
>
0
)
{
str
[
len
++
]
=
' '
;
str
[
len
++
]
=
split
;
}
if
(
row
[
i
]
==
NULL
)
{
...
...
@@ -838,8 +842,22 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
assert
(
charLen
<=
fields
[
i
].
bytes
*
TSDB_NCHAR_SIZE
&&
charLen
>=
0
);
}
// add pre quotaion if require
if
(
addQuota
)
{
*
(
str
+
len
)
=
'\''
;
len
+=
1
;
}
// copy content
memcpy
(
str
+
len
,
row
[
i
],
charLen
);
len
+=
charLen
;
// add end quotaion if require
if
(
addQuota
)
{
*
(
str
+
len
)
=
'\''
;
len
+=
1
;
}
}
break
;
case
TSDB_DATA_TYPE_TIMESTAMP
:
...
...
src/client/src/tscStream.c
浏览文件 @
8b56d495
...
...
@@ -265,9 +265,162 @@ static void tscStreamFillTimeGap(SSqlStream* pStream, TSKEY ts) {
#endif
}
// callback send values
void
cbSendValues
(
void
*
param
,
TAOS_RES
*
res
,
int
code
)
{
printf
(
" send values return code =%d
\n
"
,
code
);
}
// append values
size_t
appendValues
(
TAOS_FIELD
*
fields
,
int32_t
numCols
,
TAOS_ROW
row
,
char
*
pBuf
,
size_t
bufLen
,
size_t
curLen
,
bool
*
full
)
{
// calc buf is full
size_t
needLen
=
0
;
size_t
rowLen
=
0
;
int
i
;
for
(
i
=
0
;
i
<
numCols
;
i
++
)
{
needLen
+=
fields
[
i
].
bytes
;
}
// estimate length for use
needLen
+=
numCols
*
5
+
20
;
if
(
needLen
>=
bufLen
-
curLen
)
{
*
full
=
true
;
return
0
;
}
// row append to values
char
*
strRow
=
tmalloc
(
needLen
);
char
*
value
=
pBuf
+
curLen
;
strcpy
(
value
,
"("
);
rowLen
+=
taos_print_row_ex
(
strRow
,
row
,
fields
,
numCols
,
','
,
true
);
strcat
(
value
,
strRow
);
strcat
(
value
,
")"
);
rowLen
+=
2
;
tfree
(
strRow
);
return
rowLen
;
}
// send one table all rows for once
bool
sendChildTalbe
(
STscObj
*
pTscObj
,
char
*
superName
,
char
*
tableName
,
TAOS_FIELD
*
fields
,
int32_t
numCols
,
SArray
*
arr
)
{
int32_t
bufLen
=
TSDB_MAX_SQL_LEN
;
size_t
numRows
=
taosArrayGetSize
(
arr
);
if
(
numRows
==
0
)
return
false
;
if
(
numRows
<
50
)
bufLen
/=
20
;
else
if
(
numRows
<
500
)
bufLen
/=
5
;
else
bufLen
/=
2
;
char
dbName
[
TSDB_DB_NAME_LEN
]
=
""
;
char
fullTable
[
TSDB_TABLE_FNAME_LEN
];
// obtain dbname
char
*
p
=
strstr
(
superName
,
"."
);
if
(
p
)
{
// if have db prefix , under this db create table
int32_t
len
=
p
-
superName
;
strncpy
(
dbName
,
superName
,
len
);
dbName
[
len
]
=
0
;
// append str end
sprintf
(
fullTable
,
"%s.%s"
,
dbName
,
tableName
);
}
else
{
// no db prefix
strcpy
(
fullTable
,
tableName
);
}
char
*
pBuf
=
(
char
*
)
tmalloc
(
bufLen
);
sprintf
(
pBuf
,
"insert into %s using %s tags(0) values "
,
fullTable
,
superName
);
size_t
curLen
=
strlen
(
pBuf
);
TAOS_ROW
row
;
bool
full
=
false
;
for
(
size_t
i
=
0
;
i
<
numRows
;
i
++
)
{
row
=
(
TAOS_ROW
)
taosArrayGetP
(
arr
,
i
);
if
(
row
==
NULL
)
continue
;
curLen
+=
appendValues
(
fields
,
numCols
,
row
,
pBuf
,
bufLen
-
100
,
curLen
,
&
full
);
if
(
full
||
i
==
numRows
-
1
)
{
// need send
// send current
strcat
(
pBuf
,
";"
);
taos_query_a
(
pTscObj
,
pBuf
,
cbSendValues
,
NULL
);
// reset for next
if
(
full
)
{
sprintf
(
pBuf
,
"insert into %s.%s using %s tags(0) values "
,
dbName
,
tableName
,
superName
);
curLen
=
strlen
(
pBuf
);
// retry append. if full is true again, ignore this row
curLen
+=
appendValues
(
fields
,
numCols
,
row
,
pBuf
,
bufLen
-
100
,
curLen
,
&
full
);
full
=
false
;
// reset to false
}
}
tfree
(
row
);
}
tfree
(
pBuf
);
return
true
;
}
// write cq result to another table
bool
toAnotherTable
(
STscObj
*
pTscObj
,
char
*
superName
,
TAOS_FIELD
*
fields
,
int32_t
numCols
,
SHashObj
*
tbHash
)
{
void
*
pIter
=
taosHashIterate
(
tbHash
,
NULL
);
while
(
pIter
)
{
SArray
*
arr
=
*
(
SArray
**
)
pIter
;
if
(
arr
)
{
// get key as tableName
SHashNode
*
pNode
=
(
SHashNode
*
)
GET_HASH_PNODE
(
pIter
);
char
*
data
=
(
char
*
)
GET_HASH_NODE_KEY
(
pNode
);
uint32_t
len
=
pNode
->
keyLen
;
char
*
key
=
tmalloc
(
len
+
1
);
memcpy
(
key
,
data
,
len
);
key
[
len
]
=
0
;
// string end '\0'
// send all this table rows
sendChildTalbe
(
pTscObj
,
superName
,
key
,
fields
,
numCols
,
arr
);
// release SArray
taosArrayDestroy
(
arr
);
tfree
(
key
);
}
pIter
=
taosHashIterate
(
tbHash
,
pIter
);
}
return
true
;
}
// add row to hash to group by tbname
bool
tbHashAdd
(
SHashObj
*
tbHash
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int32_t
idx
,
int32_t
numCols
)
{
void
*
v
=
row
[
idx
];
VarDataLenT
len
=
varDataLen
((
char
*
)
v
-
VARSTR_HEADER_SIZE
);
char
*
key
=
v
;
// get array point from hash
SArray
*
arr
=
NULL
;
// get arr from hash
void
*
pdata
=
taosHashGet
(
tbHash
,
key
,
len
);
if
(
pdata
)
{
arr
=
*
(
SArray
**
)
pdata
;
}
// get arr from new
if
(
arr
==
NULL
)
{
arr
=
(
SArray
*
)
taosArrayInit
(
10
,
sizeof
(
TAOS_ROW
));
if
(
arr
==
NULL
)
{
tscError
(
"tbHashAdd tbHash:%p, taosArrayInit(10,sizeof(TAOS_ROW) return NULL."
,
tbHash
);
return
false
;
}
taosHashPut
(
tbHash
,
key
,
len
,
&
arr
,
sizeof
(
SArray
*
));
}
// add to array
int32_t
new_len
=
sizeof
(
void
*
)
*
numCols
;
TAOS_ROW
new_row
=
(
TAOS_ROW
)
tmalloc
(
new_len
);
memcpy
(
new_row
,
row
,
new_len
);
taosArrayPush
(
arr
,
&
new_row
);
return
true
;
}
static
void
tscProcessStreamRetrieveResult
(
void
*
param
,
TAOS_RES
*
res
,
int
numOfRows
)
{
SSqlStream
*
pStream
=
(
SSqlStream
*
)
param
;
SSqlObj
*
pSql
=
(
SSqlObj
*
)
res
;
bool
toAnother
=
pStream
->
to
!=
NULL
;
if
(
pSql
==
NULL
||
numOfRows
<
0
)
{
int64_t
retryDelayTime
=
tscGetRetryDelayTime
(
pStream
,
pStream
->
interval
.
sliding
,
pStream
->
precision
);
...
...
@@ -281,18 +434,57 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
STableMetaInfo
*
pTableMetaInfo
=
pQueryInfo
->
pTableMetaInfo
[
0
];
if
(
numOfRows
>
0
)
{
// when reaching here the first execution of stream computing is successful.
// init hash
SHashObj
*
tbHash
=
NULL
;
int32_t
colIdx
=
1
;
TAOS_FIELD
*
fields
=
NULL
;
int32_t
dstColsNum
=
pStream
->
dstCols
;
int32_t
fieldsNum
=
0
;
if
(
toAnother
)
{
//init hash
tbHash
=
taosHashInit
(
100
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
fields
=
taos_fetch_fields
(
res
);
fieldsNum
=
tscNumOfFields
(
pQueryInfo
);
if
(
dstColsNum
==
-
1
)
dstColsNum
=
fieldsNum
;
//search split column
char
*
split
=
"tbname"
;
// default
if
(
pStream
->
split
)
split
=
pStream
->
split
;
for
(
int32_t
i
=
1
;
i
<
fieldsNum
;
i
++
)
{
if
(
strcasecmp
(
fields
[
i
].
name
,
split
)
==
0
)
{
colIdx
=
i
;
break
;
}
}
}
// save rows
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
TAOS_ROW
row
=
taos_fetch_row
(
res
);
if
(
row
!=
NULL
)
{
tscDebug
(
"0x%"
PRIx64
" stream:%p fetch result"
,
pSql
->
self
,
pStream
);
tscStreamFillTimeGap
(
pStream
,
*
(
TSKEY
*
)
row
[
0
]);
pStream
->
stime
=
*
(
TSKEY
*
)
row
[
0
];
// user callback function
// write to another table if true
if
(
toAnother
)
{
tbHashAdd
(
tbHash
,
row
,
fields
,
colIdx
,
dstColsNum
);
if
(
i
==
numOfRows
-
1
)
//write last row to record last query time avoid query from begin for each
(
*
pStream
->
fp
)(
pStream
->
param
,
res
,
row
);
}
else
{
(
*
pStream
->
fp
)(
pStream
->
param
,
res
,
row
);
}
pStream
->
numOfRes
++
;
}
}
// write Another
if
(
toAnother
)
{
toAnotherTable
(
pSql
->
pTscObj
,
pStream
->
to
,
fields
,
dstColsNum
,
tbHash
);
taosHashCleanup
(
tbHash
);
}
if
(
!
pStream
->
isProject
)
{
pStream
->
stime
=
taosTimeAdd
(
pStream
->
stime
,
pStream
->
interval
.
sliding
,
pStream
->
interval
.
slidingUnit
,
pStream
->
precision
);
}
...
...
@@ -662,7 +854,50 @@ void cbParseSql(void* param, TAOS_RES* res, int code) {
return
;
}
TAOS_STREAM
*
taos_open_stream_withname
(
TAOS
*
taos
,
const
char
*
dstTable
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
void
splitStreamSql
(
const
char
*
str
,
char
**
sql
,
char
**
to
,
char
**
split
)
{
// OLD FORMAT only sql
if
(
strncmp
(
str
,
LABEL_SQL
,
LABEL_SQL_LEN
)
!=
0
)
{
*
sql
=
tmalloc
(
strlen
(
str
)
+
1
);
if
(
*
sql
==
NULL
)
return
;
strcpy
(
*
sql
,
str
);
return
;
}
// NEW FORMAT sql:...to:...split:...
char
*
p1
=
strstr
(
str
+
LABEL_SQL_LEN
,
LABEL_TO
);
if
(
p1
==
NULL
)
{
char
*
p
=
(
char
*
)
str
+
LABEL_SQL_LEN
;
*
sql
=
(
char
*
)
tmalloc
(
strlen
(
p
)
+
1
);
strcpy
(
*
sql
,
p
);
return
;
}
// SQL value
int32_t
len
=
p1
-
str
-
LABEL_SQL_LEN
;
*
sql
=
(
char
*
)
tmalloc
(
len
+
1
);
strncpy
(
*
sql
,
str
+
LABEL_SQL_LEN
,
len
);
sql
[
len
]
=
0
;
// str end
// TO value
char
*
p2
=
strstr
(
p1
+
LABEL_TO_LEN
,
LABEL_SPLIT
);
if
(
p2
==
NULL
)
{
char
*
p
=
p1
+
LABEL_TO_LEN
;
*
to
=
(
char
*
)
tmalloc
(
strlen
(
p
)
+
1
);
strcpy
(
*
to
,
p
);
return
;
}
len
=
p2
-
p1
-
LABEL_TO_LEN
;
*
to
=
(
char
*
)
tmalloc
(
len
+
1
);
strncpy
(
*
to
,
p1
+
LABEL_TO_LEN
,
len
);
to
[
len
]
=
0
;
// str end
// SPLIT value
char
*
p
=
p2
+
LABEL_SPLIT_LEN
;
*
split
=
(
char
*
)
tmalloc
(
strlen
(
p
)
+
1
);
strcpy
(
*
split
,
p
);
}
TAOS_STREAM
*
taos_open_stream_withname
(
TAOS
*
taos
,
const
char
*
dstTable
,
int32_t
dstCols
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
int64_t
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
),
void
*
cqhandle
)
{
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
return
NULL
;
...
...
@@ -698,6 +933,9 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
pStream
->
param
=
param
;
pStream
->
pSql
=
pSql
;
pStream
->
cqhandle
=
cqhandle
;
pStream
->
dstCols
=
dstCols
;
pStream
->
to
=
NULL
;
pStream
->
split
=
NULL
;
pSql
->
pStream
=
pStream
;
pSql
->
param
=
pStream
;
pSql
->
maxRetry
=
TSDB_MAX_REPLICA
;
...
...
@@ -706,7 +944,9 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
pSql
->
pStream
=
pStream
;
pSql
->
param
=
pStream
;
pSql
->
maxRetry
=
TSDB_MAX_REPLICA
;
pSql
->
sqlstr
=
calloc
(
1
,
strlen
(
sqlstr
)
+
1
);
// split stream sqlstr to sql,to,split
splitStreamSql
(
sqlstr
,
&
pSql
->
sqlstr
,
&
pStream
->
to
,
&
pStream
->
split
);
if
(
pSql
->
sqlstr
==
NULL
)
{
tscError
(
"0x%"
PRIx64
" failed to malloc sql string buffer"
,
pSql
->
self
);
tscFreeSqlObj
(
pSql
);
...
...
@@ -714,7 +954,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
return
NULL
;
}
strtolower
(
pSql
->
sqlstr
,
sqlstr
);
strtolower
(
pSql
->
sqlstr
,
pSql
->
sqlstr
);
pSql
->
fp
=
tscCreateStream
;
pSql
->
fetchFp
=
tscCreateStream
;
pSql
->
cmd
.
resColumnId
=
TSDB_RES_COL_ID
;
...
...
@@ -746,7 +986,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
TAOS_STREAM
*
taos_open_stream
(
TAOS
*
taos
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
int64_t
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
))
{
return
taos_open_stream_withname
(
taos
,
""
,
sqlstr
,
fp
,
stime
,
param
,
callback
,
NULL
);
return
taos_open_stream_withname
(
taos
,
""
,
-
1
,
sqlstr
,
fp
,
stime
,
param
,
callback
,
NULL
);
}
void
taos_close_stream
(
TAOS_STREAM
*
handle
)
{
...
...
@@ -772,6 +1012,16 @@ void taos_close_stream(TAOS_STREAM *handle) {
pStream
->
fp
(
pStream
->
param
,
NULL
,
NULL
);
taos_free_result
(
pSql
);
// free malloc
if
(
pStream
->
to
)
{
tfree
(
pStream
->
to
);
pStream
->
to
=
NULL
;
}
if
(
pStream
->
split
)
{
tfree
(
pStream
->
split
);
pStream
->
split
=
NULL
;
}
tfree
(
pStream
);
}
}
src/cq/src/cqMain.c
浏览文件 @
8b56d495
...
...
@@ -423,7 +423,7 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
}
// inner implement in tscStream.c
TAOS_STREAM
*
taos_open_stream_withname
(
TAOS
*
taos
,
const
char
*
desName
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
TAOS_STREAM
*
taos_open_stream_withname
(
TAOS
*
taos
,
const
char
*
desName
,
int32_t
dstCols
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
int64_t
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
),
void
*
cqhandle
);
static
void
cqCreateStream
(
SCqContext
*
pContext
,
SCqObj
*
pObj
)
{
...
...
@@ -436,9 +436,11 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
}
pObj
->
tmrId
=
0
;
int32_t
dstCols
=
-
1
;
if
(
pObj
->
pSchema
)
dstCols
=
pObj
->
pSchema
->
numOfCols
;
if
(
pObj
->
pStream
==
NULL
)
{
pObj
->
pStream
=
taos_open_stream_withname
(
pContext
->
dbConn
,
pObj
->
dstTable
,
pObj
->
sqlStr
,
cqProcessStreamRes
,
\
pObj
->
pStream
=
taos_open_stream_withname
(
pContext
->
dbConn
,
pObj
->
dstTable
,
dstCols
,
pObj
->
sqlStr
,
cqProcessStreamRes
,
\
INT64_MIN
,
(
void
*
)
pObj
->
rid
,
NULL
,
pContext
);
// TODO the pObj->pStream may be released if error happens
...
...
src/inc/taos.h
浏览文件 @
8b56d495
...
...
@@ -138,6 +138,7 @@ DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT
TAOS_FIELD
*
taos_fetch_fields
(
TAOS_RES
*
res
);
DLL_EXPORT
int
taos_select_db
(
TAOS
*
taos
,
const
char
*
db
);
DLL_EXPORT
int
taos_print_row
(
char
*
str
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int
num_fields
);
DLL_EXPORT
int
taos_print_row_ex
(
char
*
str
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int
num_fields
,
char
split
,
bool
addQuota
);
DLL_EXPORT
void
taos_stop_query
(
TAOS_RES
*
res
);
DLL_EXPORT
bool
taos_is_null
(
TAOS_RES
*
res
,
int32_t
row
,
int32_t
col
);
DLL_EXPORT
int
taos_fetch_block
(
TAOS_RES
*
res
,
TAOS_ROW
*
rows
);
...
...
src/inc/ttokendef.h
浏览文件 @
8b56d495
...
...
@@ -133,85 +133,87 @@
#define TK_UNSIGNED 115
#define TK_TAGS 116
#define TK_USING 117
#define TK_NULL 118
#define TK_NOW 119
#define TK_SELECT 120
#define TK_UNION 121
#define TK_ALL 122
#define TK_DISTINCT 123
#define TK_FROM 124
#define TK_VARIABLE 125
#define TK_INTERVAL 126
#define TK_EVERY 127
#define TK_SESSION 128
#define TK_STATE_WINDOW 129
#define TK_FILL 130
#define TK_SLIDING 131
#define TK_ORDER 132
#define TK_BY 133
#define TK_ASC 134
#define TK_GROUP 135
#define TK_HAVING 136
#define TK_LIMIT 137
#define TK_OFFSET 138
#define TK_SLIMIT 139
#define TK_SOFFSET 140
#define TK_WHERE 141
#define TK_RESET 142
#define TK_QUERY 143
#define TK_SYNCDB 144
#define TK_ADD 145
#define TK_COLUMN 146
#define TK_MODIFY 147
#define TK_TAG 148
#define TK_CHANGE 149
#define TK_SET 150
#define TK_KILL 151
#define TK_CONNECTION 152
#define TK_STREAM 153
#define TK_COLON 154
#define TK_ABORT 155
#define TK_AFTER 156
#define TK_ATTACH 157
#define TK_BEFORE 158
#define TK_BEGIN 159
#define TK_CASCADE 160
#define TK_CLUSTER 161
#define TK_CONFLICT 162
#define TK_COPY 163
#define TK_DEFERRED 164
#define TK_DELIMITERS 165
#define TK_DETACH 166
#define TK_EACH 167
#define TK_END 168
#define TK_EXPLAIN 169
#define TK_FAIL 170
#define TK_FOR 171
#define TK_IGNORE 172
#define TK_IMMEDIATE 173
#define TK_INITIALLY 174
#define TK_INSTEAD 175
#define TK_MATCH 176
#define TK_KEY 177
#define TK_OF 178
#define TK_RAISE 179
#define TK_REPLACE 180
#define TK_RESTRICT 181
#define TK_ROW 182
#define TK_STATEMENT 183
#define TK_TRIGGER 184
#define TK_VIEW 185
#define TK_SEMI 186
#define TK_NONE 187
#define TK_PREV 188
#define TK_LINEAR 189
#define TK_IMPORT 190
#define TK_TBNAME 191
#define TK_JOIN 192
#define TK_INSERT 193
#define TK_INTO 194
#define TK_VALUES 195
#define TK_FILE 196
#define TK_TO 118
#define TK_SPLIT 119
#define TK_NULL 120
#define TK_NOW 121
#define TK_SELECT 122
#define TK_UNION 123
#define TK_ALL 124
#define TK_DISTINCT 125
#define TK_FROM 126
#define TK_VARIABLE 127
#define TK_INTERVAL 128
#define TK_EVERY 129
#define TK_SESSION 130
#define TK_STATE_WINDOW 131
#define TK_FILL 132
#define TK_SLIDING 133
#define TK_ORDER 134
#define TK_BY 135
#define TK_ASC 136
#define TK_GROUP 137
#define TK_HAVING 138
#define TK_LIMIT 139
#define TK_OFFSET 140
#define TK_SLIMIT 141
#define TK_SOFFSET 142
#define TK_WHERE 143
#define TK_RESET 144
#define TK_QUERY 145
#define TK_SYNCDB 146
#define TK_ADD 147
#define TK_COLUMN 148
#define TK_MODIFY 149
#define TK_TAG 150
#define TK_CHANGE 151
#define TK_SET 152
#define TK_KILL 153
#define TK_CONNECTION 154
#define TK_STREAM 155
#define TK_COLON 156
#define TK_ABORT 157
#define TK_AFTER 158
#define TK_ATTACH 159
#define TK_BEFORE 160
#define TK_BEGIN 161
#define TK_CASCADE 162
#define TK_CLUSTER 163
#define TK_CONFLICT 164
#define TK_COPY 165
#define TK_DEFERRED 166
#define TK_DELIMITERS 167
#define TK_DETACH 168
#define TK_EACH 169
#define TK_END 170
#define TK_EXPLAIN 171
#define TK_FAIL 172
#define TK_FOR 173
#define TK_IGNORE 174
#define TK_IMMEDIATE 175
#define TK_INITIALLY 176
#define TK_INSTEAD 177
#define TK_MATCH 178
#define TK_KEY 179
#define TK_OF 180
#define TK_RAISE 181
#define TK_REPLACE 182
#define TK_RESTRICT 183
#define TK_ROW 184
#define TK_STATEMENT 185
#define TK_TRIGGER 186
#define TK_VIEW 187
#define TK_SEMI 188
#define TK_NONE 189
#define TK_PREV 190
#define TK_LINEAR 191
#define TK_IMPORT 192
#define TK_TBNAME 193
#define TK_JOIN 194
#define TK_INSERT 195
#define TK_INTO 196
#define TK_VALUES 197
#define TK_FILE 198
#define TK_SPACE 300
...
...
src/mnode/src/mnodeTable.c
浏览文件 @
8b56d495
...
...
@@ -845,7 +845,7 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
memcpy
(
pSubMsg
->
pCont
+
sizeof
(
SCMCreateTableMsg
),
p
,
htonl
(
p
->
len
));
code
=
mnodeValidateCreateTableMsg
(
p
,
pSubMsg
);
if
(
code
==
TSDB_CODE_SUCCESS
||
code
==
TSDB_CODE_MND_TABLE_ALREADY_EXIST
)
{
if
(
code
==
TSDB_CODE_SUCCESS
||
(
p
->
igExists
==
1
&&
code
==
TSDB_CODE_MND_TABLE_ALREADY_EXIST
)
)
{
++
pSubMsg
->
pBatchMasterMsg
->
successed
;
mnodeDestroySubMsg
(
pSubMsg
);
continue
;
...
...
@@ -1040,11 +1040,21 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
if
(
code
==
TSDB_CODE_SUCCESS
)
{
mLInfo
(
"stable:%s, is created in sdb, uid:%"
PRIu64
,
pTable
->
info
.
tableId
,
pTable
->
uid
);
if
(
pMsg
->
pBatchMasterMsg
)
pMsg
->
pBatchMasterMsg
->
successed
++
;
}
else
{
mError
(
"msg:%p, app:%p stable:%s, failed to create in sdb, reason:%s"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
pTable
->
info
.
tableId
,
tstrerror
(
code
));
SSdbRow
desc
=
{.
type
=
SDB_OPER_GLOBAL
,
.
pObj
=
pTable
,
.
pTable
=
tsSuperTableSdb
};
sdbDeleteRow
(
&
desc
);
if
(
pMsg
->
pBatchMasterMsg
)
pMsg
->
pBatchMasterMsg
->
received
++
;
}
// if super table create by batch msg, check done and send finished to client
if
(
pMsg
->
pBatchMasterMsg
)
{
if
(
pMsg
->
pBatchMasterMsg
->
successed
+
pMsg
->
pBatchMasterMsg
->
received
>=
pMsg
->
pBatchMasterMsg
->
expected
)
dnodeSendRpcMWriteRsp
(
pMsg
->
pBatchMasterMsg
,
code
);
}
return
code
;
...
...
src/query/inc/qSqlparser.h
浏览文件 @
8b56d495
...
...
@@ -25,6 +25,7 @@ extern "C" {
#include "tstrbuild.h"
#include "ttoken.h"
#include "tvariant.h"
#include "tname.h"
#define ParseTOKENTYPE SStrToken
...
...
@@ -139,8 +140,11 @@ typedef struct SCreatedTableInfo {
typedef
struct
SCreateTableSql
{
SStrToken
name
;
// table name, create table [name] xxx
SStrToken
to
;
// create stream to anohter table
SStrToken
split
;
// split columns
int8_t
type
;
// create normal table/from super table/ stream
bool
existCheck
;
SName
toSName
;
struct
{
SArray
*
pTagColumns
;
// SArray<TAOS_FIELD>
...
...
@@ -313,6 +317,7 @@ SArray *setSubclause(SArray *pList, void *pSqlNode);
SArray
*
appendSelectClause
(
SArray
*
pList
,
void
*
pSubclause
);
void
setCreatedTableName
(
SSqlInfo
*
pInfo
,
SStrToken
*
pTableNameToken
,
SStrToken
*
pIfNotExists
);
void
setCreatedStreamOpt
(
SSqlInfo
*
pInfo
,
SStrToken
*
pTo
,
SStrToken
*
pSplit
);
void
SqlInfoDestroy
(
SSqlInfo
*
pInfo
);
...
...
src/query/inc/sql.y
浏览文件 @
8b56d495
...
...
@@ -413,14 +413,28 @@ tagNamelist(A) ::= ids(X). {A = taosArrayInit(4, sizeof(SSt
// create stream
// create table table_name as select count(*) from super_table_name interval(time)
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) AS select(S). {
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z)
to_opt(E) split_opt(F)
AS select(S). {
A = tSetCreateTableInfo(NULL, NULL, S, TSQL_CREATE_STREAM);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
setCreatedStreamOpt(pInfo, &E, &F);
V.n += Z.n;
setCreatedTableName(pInfo, &V, &U);
}
// to_opt
%type to_opt {SStrToken}
to_opt(A) ::= . {A.n = 0;}
to_opt(A) ::= TO ids(X) cpxName(Y). {
A = X;
A.n += Y.n;
}
// split_opt
%type to_split {SStrToken}
split_opt(A) ::= . {A.n = 0;}
split_opt(A) ::= SPLIT ids(X). { A = X;}
%type column{TAOS_FIELD}
%type columnlist{SArray*}
%destructor columnlist {taosArrayDestroy($$);}
...
...
src/query/src/qSqlParser.c
浏览文件 @
8b56d495
...
...
@@ -1002,6 +1002,11 @@ void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken
pInfo
->
pCreateTableInfo
->
existCheck
=
(
pIfNotExists
->
n
!=
0
);
}
void
setCreatedStreamOpt
(
SSqlInfo
*
pInfo
,
SStrToken
*
pTo
,
SStrToken
*
pSplit
)
{
pInfo
->
pCreateTableInfo
->
to
=
*
pTo
;
pInfo
->
pCreateTableInfo
->
split
=
*
pSplit
;
}
void
setDCLSqlElems
(
SSqlInfo
*
pInfo
,
int32_t
type
,
int32_t
nParam
,
...)
{
pInfo
->
type
=
type
;
if
(
nParam
==
0
)
{
...
...
src/query/src/sql.c
浏览文件 @
8b56d495
此差异已折叠。
点击以展开。
src/util/inc/ttoken.h
浏览文件 @
8b56d495
...
...
@@ -63,6 +63,15 @@ uint32_t tGetToken(char *z, uint32_t *tokenType);
*/
SStrToken
tStrGetToken
(
char
*
str
,
int32_t
*
i
,
bool
isPrevOptr
);
/**
* strcpy implement source from SStrToken
*
* @param dst copy to
* @param srcToken copy from
* @return size of copy successful bytes, not include '\0'
*/
int32_t
tStrNCpy
(
char
*
dst
,
SStrToken
*
srcToken
);
/**
* check if it is a keyword or not
* @param z
...
...
src/util/src/ttokenizer.c
浏览文件 @
8b56d495
...
...
@@ -227,6 +227,8 @@ static SKeyword keywordTable[] = {
{
"OUTPUTTYPE"
,
TK_OUTPUTTYPE
},
{
"AGGREGATE"
,
TK_AGGREGATE
},
{
"BUFSIZE"
,
TK_BUFSIZE
},
{
"TO"
,
TK_TO
},
{
"SPLIT"
,
TK_SPLIT
},
};
static
const
char
isIdChar
[]
=
{
...
...
@@ -676,6 +678,18 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr) {
return
t0
;
}
/**
* strcpy implement source from SStrToken
*
* @param dst copy to
* @param srcToken copy from
* @return size of copy successful bytes
*/
int32_t
tStrNCpy
(
char
*
dst
,
SStrToken
*
srcToken
)
{
strncpy
(
dst
,
srcToken
->
z
,
srcToken
->
n
);
return
srcToken
->
n
;
}
bool
taosIsKeyWordToken
(
const
char
*
z
,
int32_t
len
)
{
return
(
tKeywordCode
((
char
*
)
z
,
len
)
!=
TK_ID
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录