Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f1c1501b
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f1c1501b
编写于
2月 14, 2022
作者:
S
shenglian zhou
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/develop' into szhou/feature/td-11218
上级
12a52088
ac6c719a
变更
24
显示空白变更内容
内联
并排
Showing
24 changed file
with
2447 addition
and
2314 deletion
+2447
-2314
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+8
-0
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+6
-0
src/client/src/taos.def
src/client/src/taos.def
+1
-0
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+15
-0
src/client/src/tscServer.c
src/client/src/tscServer.c
+87
-17
src/client/src/tscSql.c
src/client/src/tscSql.c
+122
-2
src/client/src/tscStream.c
src/client/src/tscStream.c
+430
-83
src/common/inc/tglobal.h
src/common/inc/tglobal.h
+1
-1
src/common/src/tglobal.c
src/common/src/tglobal.c
+4
-4
src/cq/src/cqMain.c
src/cq/src/cqMain.c
+5
-3
src/inc/taos.h
src/inc/taos.h
+4
-0
src/inc/trpc.h
src/inc/trpc.h
+1
-0
src/inc/ttokendef.h
src/inc/ttokendef.h
+82
-80
src/mnode/src/mnodeTable.c
src/mnode/src/mnodeTable.c
+16
-6
src/query/inc/qSqlparser.h
src/query/inc/qSqlparser.h
+5
-0
src/query/inc/sql.y
src/query/inc/sql.y
+15
-1
src/query/src/qSqlParser.c
src/query/src/qSqlParser.c
+5
-0
src/query/src/queryMain.c
src/query/src/queryMain.c
+0
-45
src/query/src/sql.c
src/query/src/sql.c
+1595
-2071
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+6
-0
src/util/inc/tidpool.h
src/util/inc/tidpool.h
+3
-0
src/util/inc/ttoken.h
src/util/inc/ttoken.h
+9
-0
src/util/src/tidpool.c
src/util/src/tidpool.c
+12
-0
src/util/src/ttokenizer.c
src/util/src/ttokenizer.c
+15
-1
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
f1c1501b
...
...
@@ -29,6 +29,14 @@ extern "C" {
#include "tsched.h"
#include "tsclient.h"
#define LABEL_SQL "sql:"
#define LABEL_TO " to:"
#define LABEL_SPLIT " split:"
#define LABEL_SQL_LEN (sizeof(LABEL_SQL) - 1)
#define LABEL_TO_LEN (sizeof(LABEL_TO) - 1)
#define LABEL_SPLIT_LEN (sizeof(LABEL_SPLIT) - 1)
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
...
...
src/client/inc/tsclient.h
浏览文件 @
f1c1501b
...
...
@@ -405,6 +405,10 @@ typedef struct SSqlStream {
int16_t
precision
;
int64_t
num
;
// number of computing count
int32_t
dstCols
;
// dstTable has number of columns
char
*
to
;
char
*
split
;
/*
* keep the number of current result in computing,
* the value will be set to 0 before set timer for next computing
...
...
@@ -484,6 +488,8 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
void
*
param
,
TAOS
**
taos
);
TAOS_RES
*
taos_query_h
(
TAOS
*
taos
,
const
char
*
sqlstr
,
int64_t
*
res
);
TAOS_RES
*
taos_query_ra
(
TAOS
*
taos
,
const
char
*
sqlstr
,
__async_cb_func_t
fp
,
void
*
param
);
// get taos connection unused session number
int32_t
taos_unused_session
(
TAOS
*
taos
);
void
waitForQueryRsp
(
void
*
param
,
TAOS_RES
*
tres
,
int
code
);
...
...
src/client/src/taos.def
浏览文件 @
f1c1501b
...
...
@@ -53,3 +53,4 @@ taos_is_null
taos_insert_lines
taos_schemaless_insert
taos_result_block
taos_print_row_ex
\ No newline at end of file
src/client/src/tscSQLParser.c
浏览文件 @
f1c1501b
...
...
@@ -1443,6 +1443,7 @@ int32_t tscSetTableFullName(SName* pName, SStrToken* pTableName, SSqlObj* pSql,
const
char
*
msg3
=
"no acctId"
;
const
char
*
msg4
=
"db name too long"
;
const
char
*
msg5
=
"table name too long"
;
const
char
*
msg6
=
"table name empty"
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -1494,6 +1495,8 @@ int32_t tscSetTableFullName(SName* pName, SStrToken* pTableName, SSqlObj* pSql,
if
(
pTableName
->
n
>=
TSDB_TABLE_NAME_LEN
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg1
);
}
else
if
(
pTableName
->
n
==
0
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg6
);
}
char
name
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
...
...
@@ -9082,6 +9085,18 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg1
);
}
// check to valid and create to name
if
(
pInfo
->
pCreateTableInfo
->
to
.
n
>
0
)
{
bool
dbInclude
=
false
;
if
(
tscValidateName
(
&
pInfo
->
pCreateTableInfo
->
to
,
false
,
&
dbInclude
)
!=
TSDB_CODE_SUCCESS
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg1
);
}
int32_t
code
=
tscSetTableFullName
(
&
pInfo
->
pCreateTableInfo
->
toSName
,
&
pInfo
->
pCreateTableInfo
->
to
,
pSql
,
dbInclude
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
SRelationInfo
*
pFromInfo
=
pInfo
->
pCreateTableInfo
->
pSelect
->
from
;
if
(
pFromInfo
==
NULL
||
taosArrayGetSize
(
pFromInfo
->
list
)
==
0
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg6
);
...
...
src/client/src/tscServer.c
浏览文件 @
f1c1501b
...
...
@@ -1546,11 +1546,49 @@ int tscEstimateCreateTableMsgLength(SSqlObj *pSql, SSqlInfo *pInfo) {
if
(
pCreateTableInfo
->
pSelect
!=
NULL
)
{
size
+=
(
pCreateTableInfo
->
pSelect
->
sqlstr
.
n
+
1
);
// add size = create super table with same columns and 1 tags
if
(
pCreateTableInfo
->
to
.
n
>
0
)
{
size
+=
sizeof
(
SCreateTableMsg
);
size
+=
sizeof
(
SSchema
)
*
(
pCmd
->
numOfCols
+
1
);
size
+=
pCreateTableInfo
->
to
.
n
+
4
;
// to:
if
(
pCreateTableInfo
->
split
.
n
>
0
)
size
+=
pCreateTableInfo
->
split
.
n
+
7
;
// split:
}
}
return
size
+
TSDB_EXTRA_PAYLOAD_SIZE
;
}
char
*
fillCreateSTableMsg
(
SCreateTableMsg
*
pCreateMsg
,
SCreateTableSql
*
pTableSql
,
SSqlCmd
*
pCmd
,
SSqlInfo
*
pInfo
)
{
// SET
SSchema
*
pSchema
=
(
SSchema
*
)
pCreateMsg
->
schema
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
);
pCreateMsg
->
igExists
=
0
;
pCreateMsg
->
sqlLen
=
0
;
// FullName like acctID.dbName.tableName
tNameExtractFullName
(
&
pInfo
->
pCreateTableInfo
->
toSName
,
pCreateMsg
->
tableName
);
// copy columns
pCreateMsg
->
numOfColumns
=
htons
(
pCmd
->
numOfCols
);
for
(
int
i
=
0
;
i
<
pCmd
->
numOfCols
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fieldsInfo
,
i
);
pSchema
->
type
=
pField
->
type
;
strcpy
(
pSchema
->
name
,
pField
->
name
);
pSchema
->
bytes
=
htons
(
pField
->
bytes
);
pSchema
++
;
}
// append one tag
pCreateMsg
->
numOfTags
=
htons
(
1
);
// only one tag immutable
pSchema
->
type
=
TSDB_DATA_TYPE_INT
;
pSchema
->
bytes
=
htons
(
INT_BYTES
);
strcpy
(
pSchema
->
name
,
"tag1"
);
pSchema
++
;
return
(
char
*
)
pSchema
;
}
int
tscBuildCreateTableMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
int
msgLen
=
0
;
int
size
=
0
;
...
...
@@ -1608,39 +1646,71 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pCreate
->
len
=
htonl
(
len
);
}
}
else
{
// create (super) table
// FIRST MSG
SCreateTableMsg
*
pCreate
=
pCreateMsg
;
pCreateTableMsg
->
numOfTables
=
htonl
(
1
);
// only one table will be created
int32_t
code
=
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
pCreateMsg
->
tableName
)
;
int32_t
code
=
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
pCreate
->
tableName
);
bool
to
=
pInfo
->
pCreateTableInfo
->
to
.
n
>
0
;
assert
(
code
==
0
);
SCreateTableSql
*
pCreateTable
=
pInfo
->
pCreateTableInfo
;
pCreateMsg
->
igExists
=
pCreateTable
->
existCheck
?
1
:
0
;
pCreateMsg
->
numOfColumns
=
htons
(
pCmd
->
numOfCols
);
pCreateMsg
->
numOfTags
=
htons
(
pCmd
->
count
);
pCreateMsg
->
sqlLen
=
0
;
pMsg
=
(
char
*
)
pCreateMsg
->
schema
;
pSchema
=
(
SSchema
*
)
pCreateMsg
->
schema
;
pCreate
->
igExists
=
pCreateTable
->
existCheck
?
1
:
0
;
pCreate
->
numOfColumns
=
htons
(
pCmd
->
numOfCols
);
pCreate
->
numOfTags
=
htons
(
pCmd
->
count
);
pCreate
->
sqlLen
=
0
;
pSchema
=
(
SSchema
*
)
pCreate
->
schema
;
//copy schema
for
(
int
i
=
0
;
i
<
pCmd
->
numOfCols
+
pCmd
->
count
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fieldsInfo
,
i
);
pSchema
->
type
=
pField
->
type
;
strcpy
(
pSchema
->
name
,
pField
->
name
);
pSchema
->
bytes
=
htons
(
pField
->
bytes
);
pSchema
++
;
}
//copy stream sql if have
pMsg
=
(
char
*
)
pSchema
;
if
(
type
==
TSQL_CREATE_STREAM
)
{
// check if it is a stream sql
SSqlNode
*
pQuerySql
=
pInfo
->
pCreateTableInfo
->
pSelect
;
int16_t
len
=
0
;
if
(
to
)
{
//sql:
strcpy
(
pMsg
,
LABEL_SQL
);
len
+=
LABEL_SQL_LEN
;
len
+=
tStrNCpy
(
pMsg
+
len
,
&
pQuerySql
->
sqlstr
);
//to
strcpy
(
pMsg
+
len
,
LABEL_TO
);
len
+=
LABEL_TO_LEN
;
len
+=
tStrNCpy
(
pMsg
+
len
,
&
pInfo
->
pCreateTableInfo
->
to
);
//split if
if
(
pInfo
->
pCreateTableInfo
->
split
.
n
>
0
)
{
strcpy
(
pMsg
+
len
,
LABEL_SPLIT
);
len
+=
LABEL_SPLIT_LEN
;
len
+=
tStrNCpy
(
pMsg
+
len
,
&
pInfo
->
pCreateTableInfo
->
split
);
}
// append string end flag
pMsg
[
len
++
]
=
0
;
pMsg
+=
len
;
pCreate
->
sqlLen
=
htons
(
len
);
}
else
{
len
=
pQuerySql
->
sqlstr
.
n
;
strncpy
(
pMsg
,
pQuerySql
->
sqlstr
.
z
,
len
);
pMsg
[
len
++
]
=
0
;
// string end
pMsg
+=
len
;
pCreate
->
sqlLen
=
htons
(
len
);
}
}
// calc first msg length
int32_t
len
=
(
int32_t
)(
pMsg
-
(
char
*
)
pCreate
);
pCreate
->
len
=
htonl
(
len
);
strncpy
(
pMsg
,
pQuerySql
->
sqlstr
.
z
,
pQuerySql
->
sqlstr
.
n
+
1
);
pCreateMsg
->
sqlLen
=
htons
(
pQuerySql
->
sqlstr
.
n
+
1
);
pMsg
+=
pQuerySql
->
sqlstr
.
n
+
1
;
// filling second msg if to have value
if
(
to
)
{
pCreate
=
(
SCreateTableMsg
*
)
pMsg
;
pMsg
=
fillCreateSTableMsg
(
pCreate
,
pCreateTable
,
pCmd
,
pInfo
);
len
=
(
int32_t
)(
pMsg
-
(
char
*
)
pCreate
);
pCreate
->
len
=
htonl
(
len
);
pCreateTableMsg
->
numOfTables
=
htonl
(
2
);
}
}
...
...
src/client/src/tscSql.c
浏览文件 @
f1c1501b
...
...
@@ -28,6 +28,7 @@
#include "tutil.h"
#include "ttimer.h"
#include "tscProfile.h"
#include "tidpool.h"
static
char
clusterDefaultId
[]
=
"clusterDefaultId"
;
static
bool
validImpl
(
const
char
*
str
,
size_t
maxsize
)
{
...
...
@@ -307,6 +308,25 @@ void taos_close(TAOS *taos) {
taosRemoveRef
(
tscRefId
,
pObj
->
rid
);
}
// get taos connection unused session number
int32_t
taos_unused_session
(
TAOS
*
taos
)
{
// param valid check
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
{
tscError
(
"pObj:%p is NULL or freed"
,
pObj
);
terrno
=
TSDB_CODE_TSC_DISCONNECTED
;
return
0
;
}
if
(
pObj
->
pRpcObj
==
NULL
)
{
tscError
(
"pObj:%p pRpcObj is NULL."
,
pObj
);
terrno
=
TSDB_CODE_TSC_DISCONNECTED
;
return
0
;
}
// get number
return
rpcUnusedSession
(
pObj
->
pRpcObj
->
pDnodeConn
,
false
);
}
void
waitForQueryRsp
(
void
*
param
,
TAOS_RES
*
tres
,
int
code
)
{
assert
(
tres
!=
NULL
);
...
...
@@ -807,13 +827,16 @@ bool taos_is_update_query(TAOS_RES *res) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
return
((
pCmd
->
command
>=
TSDB_SQL_INSERT
&&
pCmd
->
command
<=
TSDB_SQL_DROP_DNODE
)
||
TSDB_SQL_RESET_CACHE
==
pCmd
->
command
||
TSDB_SQL_USE_DB
==
pCmd
->
command
);
}
int
taos_print_row
(
char
*
str
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int
num_fields
)
{
return
taos_print_row_ex
(
str
,
row
,
fields
,
num_fields
,
' '
,
false
);
}
int
taos_print_row_ex
(
char
*
str
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int
num_fields
,
char
split
,
bool
addQuota
)
{
int
len
=
0
;
for
(
int
i
=
0
;
i
<
num_fields
;
++
i
)
{
if
(
i
>
0
)
{
str
[
len
++
]
=
' '
;
str
[
len
++
]
=
split
;
}
if
(
row
[
i
]
==
NULL
)
{
...
...
@@ -875,8 +898,22 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
assert
(
charLen
<=
fields
[
i
].
bytes
*
TSDB_NCHAR_SIZE
&&
charLen
>=
0
);
}
// add pre quotaion if require
if
(
addQuota
)
{
*
(
str
+
len
)
=
'\''
;
len
+=
1
;
}
// copy content
memcpy
(
str
+
len
,
row
[
i
],
charLen
);
len
+=
charLen
;
// add end quotaion if require
if
(
addQuota
)
{
*
(
str
+
len
)
=
'\''
;
len
+=
1
;
}
}
break
;
case
TSDB_DATA_TYPE_TIMESTAMP
:
...
...
@@ -893,6 +930,89 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
return
len
;
}
// print field value to str
int
taos_print_field
(
char
*
str
,
void
*
value
,
TAOS_FIELD
*
field
)
{
// check valid
if
(
str
==
NULL
||
value
==
NULL
||
field
==
NULL
)
{
return
0
;
}
// get value
int
len
=
0
;
switch
(
field
->
type
)
{
//
// fixed length
//
case
TSDB_DATA_TYPE_TINYINT
:
len
=
sprintf
(
str
,
"%d"
,
*
((
int8_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_UTINYINT
:
len
=
sprintf
(
str
,
"%u"
,
*
((
uint8_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
len
=
sprintf
(
str
,
"%d"
,
*
((
int16_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_USMALLINT
:
len
=
sprintf
(
str
,
"%u"
,
*
((
uint16_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_INT
:
len
=
sprintf
(
str
,
"%d"
,
*
((
int32_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_UINT
:
len
=
sprintf
(
str
,
"%u"
,
*
((
uint32_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_BIGINT
:
len
=
sprintf
(
str
,
"%"
PRId64
,
*
((
int64_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_UBIGINT
:
len
=
sprintf
(
str
,
"%"
PRIu64
,
*
((
uint64_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_FLOAT
:
{
float
fv
=
0
;
fv
=
GET_FLOAT_VAL
(
value
);
len
=
sprintf
(
str
,
"%f"
,
fv
);
}
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
{
double
dv
=
0
;
dv
=
GET_DOUBLE_VAL
(
value
);
len
=
sprintf
(
str
,
"%lf"
,
dv
);
}
break
;
case
TSDB_DATA_TYPE_TIMESTAMP
:
len
=
sprintf
(
str
,
"%"
PRId64
,
*
((
int64_t
*
)
value
));
break
;
case
TSDB_DATA_TYPE_BOOL
:
len
=
sprintf
(
str
,
"%d"
,
*
((
int8_t
*
)
value
));
//
// variant length
//
case
TSDB_DATA_TYPE_BINARY
:
case
TSDB_DATA_TYPE_NCHAR
:
{
len
=
varDataLen
((
char
*
)
value
-
VARSTR_HEADER_SIZE
);
if
(
field
->
type
==
TSDB_DATA_TYPE_BINARY
)
{
assert
(
len
<=
field
->
bytes
&&
len
>=
0
);
}
else
{
assert
(
len
<=
field
->
bytes
*
TSDB_NCHAR_SIZE
&&
len
>=
0
);
}
memcpy
(
str
,
value
,
len
);
}
break
;
default:
break
;
}
return
len
;
}
static
void
asyncCallback
(
void
*
param
,
TAOS_RES
*
tres
,
int
code
)
{
assert
(
param
!=
NULL
);
SSqlObj
*
pSql
=
((
SSqlObj
*
)
param
);
...
...
src/client/src/tscStream.c
浏览文件 @
f1c1501b
...
...
@@ -30,6 +30,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
static
void
tscProcessStreamRetrieveResult
(
void
*
param
,
TAOS_RES
*
res
,
int
numOfRows
);
static
void
tscSetNextLaunchTimer
(
SSqlStream
*
pStream
,
SSqlObj
*
pSql
);
static
void
tscSetRetryTimer
(
SSqlStream
*
pStream
,
SSqlObj
*
pSql
,
int64_t
timer
);
static
int64_t
getLaunchTimeDelay
(
const
SSqlStream
*
pStream
);
static
int64_t
getDelayValueAfterTimewindowClosed
(
SSqlStream
*
pStream
,
int64_t
launchDelay
)
{
return
taosGetTimestamp
(
pStream
->
precision
)
+
launchDelay
-
pStream
->
stime
-
1
;
...
...
@@ -49,7 +50,7 @@ static bool isProjectStream(SQueryInfo* pQueryInfo) {
static
int64_t
tscGetRetryDelayTime
(
SSqlStream
*
pStream
,
int64_t
slidingTime
,
int16_t
prec
)
{
float
retryRangeFactor
=
0
.
3
f
;
int64_t
retryDelta
=
(
int64_t
)(
tsRetryStreamCompDelay
*
retryRangeFactor
);
retryDelta
=
(
(
rand
()
%
retryDelta
)
+
tsRetryStreamCompDelay
)
*
1000L
;
retryDelta
=
(
rand
()
%
retryDelta
)
+
tsRetryStreamCompDelay
;
if
(
pStream
->
interval
.
intervalUnit
!=
'n'
&&
pStream
->
interval
.
intervalUnit
!=
'y'
)
{
// change to ms
...
...
@@ -142,7 +143,6 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
if
(
pSql
==
NULL
)
{
return
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
);
tscDebug
(
"0x%"
PRIx64
" add into timer"
,
pSql
->
self
);
...
...
@@ -160,18 +160,19 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
}
else
{
pQueryInfo
->
window
.
skey
=
pStream
->
stime
;
int64_t
etime
=
taosGetTimestamp
(
pStream
->
precision
);
int64_t
one
=
convertTimePrecision
(
1
,
TSDB_TIME_PRECISION_MILLI
,
pStream
->
precision
);
// delay to wait all data in last time window
etime
-=
convertTimePrecision
(
tsMaxStreamComputDelay
,
TSDB_TIME_PRECISION_MILLI
,
pStream
->
precision
);
if
(
etime
>
pStream
->
etime
)
{
etime
=
pStream
->
etime
;
}
else
if
(
pStream
->
interval
.
intervalUnit
!=
'y'
&&
pStream
->
interval
.
intervalUnit
!=
'n'
)
{
if
(
pStream
->
stime
==
INT64_MIN
)
{
etime
=
taosTimeTruncate
(
etime
,
&
pStream
->
interval
,
pStream
->
precision
);
etime
=
taosTimeTruncate
(
etime
,
&
pStream
->
interval
,
pStream
->
precision
)
-
one
;
}
else
{
etime
=
pStream
->
stime
+
(
etime
-
pStream
->
stime
)
/
pStream
->
interval
.
interval
*
pStream
->
interval
.
interval
;
etime
=
pStream
->
stime
+
(
etime
-
pStream
->
stime
)
/
pStream
->
interval
.
interval
*
pStream
->
interval
.
interval
-
one
;
}
}
else
{
etime
=
taosTimeTruncate
(
etime
,
&
pStream
->
interval
,
pStream
->
precision
);
etime
=
taosTimeTruncate
(
etime
,
&
pStream
->
interval
,
pStream
->
precision
)
-
one
;
}
pQueryInfo
->
window
.
ekey
=
etime
;
if
(
pQueryInfo
->
window
.
skey
>=
pQueryInfo
->
window
.
ekey
)
{
...
...
@@ -179,15 +180,32 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
if
(
pStream
->
interval
.
intervalUnit
==
'y'
||
pStream
->
interval
.
intervalUnit
==
'n'
)
{
timer
=
86400
*
1000l
;
}
else
{
timer
=
convertTimePrecision
(
timer
,
pStream
->
precision
,
TSDB_TIME_PRECISION_MILLI
);
int32_t
loop
=
10000
;
int64_t
next_time
=
pStream
->
stime
;
while
(
1
)
{
// get next time
next_time
=
taosTimeAdd
(
next_time
,
pStream
->
interval
.
sliding
,
pStream
->
interval
.
intervalUnit
,
TSDB_TIME_PRECISION_MILLI
);
timer
=
next_time
-
taosGetTimestamp
(
pStream
->
precision
);
// next time - now()
if
(
timer
<
0
&&
--
loop
>
0
)
{
//tscDebug("CQ next time < now so loop add sliding. next_time=%" PRId64, next_time);
continue
;
}
// calc launch delay time
int64_t
delay
=
getLaunchTimeDelay
((
const
SSqlStream
*
)
pStream
);
timer
+=
delay
;
tscDebug
(
"CQ execute next query after %"
PRId64
"ms (delay=%"
PRId64
")"
,
timer
,
delay
);
break
;
}
}
tscSetRetryTimer
(
pStream
,
pSql
,
timer
);
return
;
}
}
tscDebug
(
"CQ ProcessStreamTimer skey=%"
PRId64
" ekey=%"
PRId64
" stime=%"
PRId64
" etime=%"
PRId64
,
pQueryInfo
->
window
.
skey
,
pQueryInfo
->
window
.
ekey
,
pStream
->
stime
,
pStream
->
etime
);
// launch stream computing in a new thread
SSchedMsg
schedMsg
=
{
0
};
SSchedMsg
schedMsg
=
{
0
};
schedMsg
.
fp
=
tscProcessStreamLaunchQuery
;
schedMsg
.
ahandle
=
pStream
;
schedMsg
.
thandle
=
(
void
*
)
1
;
...
...
@@ -195,8 +213,6 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
taosScheduleTask
(
tscQhandle
,
&
schedMsg
);
}
static
void
cbParseSql
(
void
*
param
,
TAOS_RES
*
res
,
int
code
);
static
void
tscProcessStreamQueryCallback
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
)
{
SSqlStream
*
pStream
=
(
SSqlStream
*
)
param
;
if
(
tres
==
NULL
||
numOfRows
<
0
)
{
...
...
@@ -204,28 +220,23 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
tscError
(
"0x%"
PRIx64
" stream:%p, query data failed, code:0x%08x, retry in %"
PRId64
"ms"
,
pStream
->
pSql
->
self
,
pStream
,
numOfRows
,
retryDelay
);
S
SqlObj
*
pSql
=
pStream
->
pSql
;
S
TableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pStream
->
pSql
->
cmd
,
0
)
;
tscFreeSqlResult
(
pSql
);
tscFreeSubobj
(
pSql
);
tfree
(
pSql
->
pSubs
);
pSql
->
subState
.
numOfSub
=
0
;
char
name
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
name
);
pSql
->
parseRetry
=
0
;
int32_t
code
=
tsParseSql
(
pSql
,
true
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
cbParseSql
(
pStream
,
pSql
,
code
);
}
else
if
(
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
tscDebug
(
"0x%"
PRIx64
" CQ taso_open_stream IN Process"
,
pSql
->
self
);
}
else
{
tscError
(
"0x%"
PRIx64
" open stream failed, code:%s"
,
pSql
->
self
,
tstrerror
(
code
));
taosReleaseRef
(
tscObjRef
,
pSql
->
self
);
free
(
pStream
);
return
;
}
taosHashRemove
(
UTIL_GET_TABLEMETA
(
pStream
->
pSql
),
name
,
strnlen
(
name
,
TSDB_TABLE_FNAME_LEN
));
tfree
(
pTableMetaInfo
->
pTableMeta
);
tscFreeSqlResult
(
pStream
->
pSql
);
tscFreeSubobj
(
pStream
->
pSql
);
tfree
(
pStream
->
pSql
->
pSubs
);
pStream
->
pSql
->
subState
.
numOfSub
=
0
;
// tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
// return;
pTableMetaInfo
->
vgroupList
=
tscVgroupInfoClear
(
pTableMetaInfo
->
vgroupList
);
tscSetRetryTimer
(
pStream
,
pStream
->
pSql
,
retryDelay
);
return
;
}
taos_fetch_rows_a
(
tres
,
tscProcessStreamRetrieveResult
,
param
);
...
...
@@ -272,14 +283,233 @@ static void tscStreamFillTimeGap(SSqlStream* pStream, TSKEY ts) {
#endif
}
// callback send values
int32_t
ok_cnt
=
0
;
int32_t
err_cnt
=
0
;
void
cbSendValues
(
void
*
param
,
TAOS_RES
*
res
,
int
code
)
{
if
(
code
<
0
)
{
err_cnt
++
;
tscError
(
"CQ Send Failed. code=0x%x ok_cnt=%d err_cnt=%d"
,
code
,
ok_cnt
,
err_cnt
);
}
else
{
ok_cnt
++
;
tscInfo
(
"CQ Send OK. row=%d ok_cnt=%d err_cnt=%d"
,
code
,
ok_cnt
,
err_cnt
);
}
}
// append values
size_t
appendValues
(
TAOS_FIELD
*
fields
,
int32_t
numCols
,
TAOS_ROW
row
,
char
*
pBuf
,
size_t
bufLen
,
size_t
curLen
,
bool
*
full
)
{
// calc buf is full
size_t
needLen
=
0
;
size_t
rowLen
=
0
;
int
i
;
for
(
i
=
0
;
i
<
numCols
;
i
++
)
{
needLen
+=
fields
[
i
].
bytes
;
}
// estimate length for use
needLen
+=
numCols
*
5
+
20
;
if
(
needLen
>=
bufLen
-
curLen
)
{
*
full
=
true
;
return
0
;
}
// row append to values
char
*
strRow
=
tmalloc
(
needLen
);
char
*
value
=
pBuf
+
curLen
;
strcpy
(
value
,
"("
);
rowLen
+=
taos_print_row_ex
(
strRow
,
row
,
fields
,
numCols
,
','
,
true
);
strcat
(
value
,
strRow
);
strcat
(
value
,
")"
);
rowLen
+=
2
;
tfree
(
strRow
);
return
rowLen
;
}
bool
sqlBufSend
(
TAOS
*
taos
,
char
*
sqlBuf
)
{
// if no enough free session, wait max 10s
int32_t
sleepCnt
=
0
;
do
{
int32_t
session
=
taos_unused_session
(
taos
);
if
(
session
>
1000
)
{
break
;
}
taosMsleep
(
500
);
tscInfo
(
"CQ session < 1000. session=%d Wait 0.5s cnt=%d"
,
session
,
sleepCnt
);
}
while
(
++
sleepCnt
<
20
);
strcat
(
sqlBuf
,
";"
);
taos_query_ra
(
taos
,
sqlBuf
,
cbSendValues
,
NULL
);
return
true
;
}
#define STR_SQL_INSERT "insert into "
// send one table all rows for once
bool
sendChildTalbe
(
TAOS
*
taos
,
char
*
superName
,
char
*
tableName
,
TAOS_FIELD
*
fields
,
int32_t
numCols
,
SArray
*
arr
,
char
*
sqlBuf
,
int32_t
bufLen
)
{
char
dbName
[
TSDB_DB_NAME_LEN
]
=
""
;
char
dbTable
[
TSDB_TABLE_FNAME_LEN
];
size_t
numRows
=
taosArrayGetSize
(
arr
);
if
(
numRows
==
0
)
return
false
;
// obtain dbname
char
*
p
=
strstr
(
superName
,
"."
);
if
(
p
)
{
// if have db prefix , under this db create table
int32_t
len
=
(
int32_t
)(
p
-
superName
);
strncpy
(
dbName
,
superName
,
len
);
dbName
[
len
]
=
0
;
// append str end
sprintf
(
dbTable
,
"%s.%s"
,
dbName
,
tableName
);
}
else
{
// no db prefix
strcpy
(
dbTable
,
tableName
);
}
// first enter
if
(
sqlBuf
[
0
]
==
0
)
{
strcpy
(
sqlBuf
,
STR_SQL_INSERT
);
}
else
{
// check need send
if
(
bufLen
-
strlen
(
sqlBuf
)
<
300
)
{
sqlBufSend
(
taos
,
sqlBuf
);
strcpy
(
sqlBuf
,
STR_SQL_INSERT
);
}
}
// init
int32_t
preLen
=
(
int32_t
)
strlen
(
sqlBuf
);
char
*
subBuf
=
sqlBuf
+
preLen
;
int32_t
subLen
=
bufLen
-
preLen
;
sprintf
(
subBuf
,
" %s using %s tags(0) values "
,
dbTable
,
superName
);
size_t
curLen
=
strlen
(
subBuf
);
TAOS_ROW
row
;
bool
full
=
false
;
for
(
size_t
i
=
0
;
i
<
numRows
;
i
++
)
{
row
=
(
TAOS_ROW
)
taosArrayGetP
(
arr
,
i
);
if
(
row
==
NULL
)
continue
;
if
(
subLen
>
200
)
curLen
+=
appendValues
(
fields
,
numCols
,
row
,
subBuf
,
subLen
-
100
,
curLen
,
&
full
);
else
full
=
true
;
if
(
full
)
{
// need send
// send current
sqlBufSend
(
taos
,
sqlBuf
);
// init reset
strcpy
(
sqlBuf
,
STR_SQL_INSERT
);
preLen
=
(
int32_t
)
strlen
(
sqlBuf
);
subBuf
=
sqlBuf
+
preLen
;
subLen
=
bufLen
-
preLen
;
sprintf
(
subBuf
,
" %s using %s tags(0) values "
,
dbTable
,
superName
);
curLen
=
strlen
(
subBuf
);
// retry append. if full is true again, ignore this row
curLen
+=
appendValues
(
fields
,
numCols
,
row
,
subBuf
,
subLen
-
100
,
curLen
,
&
full
);
full
=
false
;
// reset to false
}
tfree
(
row
);
}
return
true
;
}
// write cq result to another table
bool
toAnotherTable
(
STscObj
*
pTscObj
,
char
*
superName
,
TAOS_FIELD
*
fields
,
int32_t
numCols
,
SHashObj
*
tbHash
,
int32_t
numRows
)
{
int32_t
bufLen
=
TSDB_MAX_SQL_LEN
/
2
-
128
;
char
*
sqlBuf
=
tmalloc
(
bufLen
);
sqlBuf
[
0
]
=
0
;
// init
ok_cnt
=
0
;
err_cnt
=
0
;
int
cnt_table
=
0
;
void
*
pIter
=
taosHashIterate
(
tbHash
,
NULL
);
while
(
pIter
)
{
SArray
*
arr
=
*
(
SArray
**
)
pIter
;
if
(
arr
)
{
// get key as tableName
SHashNode
*
pNode
=
(
SHashNode
*
)
GET_HASH_PNODE
(
pIter
);
char
*
data
=
(
char
*
)
GET_HASH_NODE_KEY
(
pNode
);
uint32_t
len
=
pNode
->
keyLen
;
char
*
key
=
tmalloc
(
len
+
1
);
memcpy
(
key
,
data
,
len
);
key
[
len
]
=
0
;
// string end '\0'
// send all this table rows
sendChildTalbe
(
pTscObj
,
superName
,
key
,
fields
,
numCols
,
arr
,
sqlBuf
,
bufLen
);
// release SArray
taosArrayDestroy
(
&
arr
);
tfree
(
key
);
cnt_table
++
;
}
pIter
=
taosHashIterate
(
tbHash
,
pIter
);
}
if
(
sqlBuf
[
0
])
{
sqlBufSend
(
pTscObj
,
sqlBuf
);
}
tscInfo
(
"CQ ===== stream %d rows write to %d tables =====
\n
"
,
numRows
,
cnt_table
);
tfree
(
sqlBuf
);
return
true
;
}
// add row to hash to group by tbname
bool
tbHashAdd
(
SHashObj
*
tbHash
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int32_t
idx
,
int32_t
numCols
)
{
void
*
v
=
row
[
idx
];
TAOS_FIELD
*
field
=
&
fields
[
idx
];
VarDataLenT
len
=
0
;
char
str
[
128
];
memset
(
str
,
0
,
sizeof
(
str
));
char
*
key
=
str
;
// get key and len
if
(
field
->
type
==
TSDB_DATA_TYPE_BINARY
||
field
->
type
==
TSDB_DATA_TYPE_NCHAR
)
{
key
=
v
;
len
=
varDataLen
((
char
*
)
v
-
VARSTR_HEADER_SIZE
);
}
else
{
len
=
taos_print_field
(
str
,
v
,
field
);
}
if
(
len
==
0
)
{
return
false
;
}
// append key with len
SArray
*
arr
=
NULL
;
void
*
pdata
=
taosHashGet
(
tbHash
,
key
,
len
);
if
(
pdata
)
{
arr
=
*
(
SArray
**
)
pdata
;
}
// if group is null create new
if
(
arr
==
NULL
)
{
arr
=
(
SArray
*
)
taosArrayInit
(
10
,
sizeof
(
TAOS_ROW
));
if
(
arr
==
NULL
)
{
tscError
(
"tbHashAdd tbHash:%p, taosArrayInit(10,sizeof(TAOS_ROW) return NULL."
,
tbHash
);
return
false
;
}
taosHashPut
(
tbHash
,
key
,
len
,
&
arr
,
sizeof
(
SArray
*
));
}
// append to group
int32_t
new_len
=
sizeof
(
void
*
)
*
numCols
;
TAOS_ROW
new_row
=
(
TAOS_ROW
)
tmalloc
(
new_len
);
memcpy
(
new_row
,
row
,
new_len
);
taosArrayPush
(
arr
,
&
new_row
);
return
true
;
}
static
void
tscProcessStreamRetrieveResult
(
void
*
param
,
TAOS_RES
*
res
,
int
numOfRows
)
{
SSqlStream
*
pStream
=
(
SSqlStream
*
)
param
;
SSqlObj
*
pSql
=
(
SSqlObj
*
)
res
;
bool
toAnother
=
pStream
->
to
!=
NULL
;
if
(
pSql
==
NULL
||
numOfRows
<
0
)
{
int64_t
retryDelayTime
=
tscGetRetryDelayTime
(
pStream
,
pStream
->
interval
.
sliding
,
pStream
->
precision
);
tscError
(
"stream:%p, retrieve data failed, code:0x%08x, retry in %"
PRId64
" ms"
,
pStream
,
numOfRows
,
retryDelayTime
);
tscSetRetryTimer
(
pStream
,
pStream
->
pSql
,
retryDelayTime
);
return
;
}
...
...
@@ -288,23 +518,72 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
STableMetaInfo
*
pTableMetaInfo
=
pQueryInfo
->
pTableMetaInfo
[
0
];
if
(
numOfRows
>
0
)
{
// when reaching here the first execution of stream computing is successful.
// init hash
SHashObj
*
tbHash
=
NULL
;
int32_t
colIdx
=
-
1
;
TAOS_FIELD
*
fields
=
NULL
;
int32_t
dstColsNum
=
pStream
->
dstCols
;
int32_t
fieldsNum
=
0
;
if
(
toAnother
)
{
//init hash
tbHash
=
taosHashInit
(
100
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
fields
=
taos_fetch_fields
(
res
);
fieldsNum
=
tscNumOfFields
(
pQueryInfo
);
if
(
dstColsNum
==
-
1
)
dstColsNum
=
fieldsNum
;
//search split column
char
*
split
=
"tbname"
;
// default
if
(
pStream
->
split
)
split
=
pStream
->
split
;
for
(
int32_t
i
=
1
;
i
<
fieldsNum
;
i
++
)
{
if
(
strcasecmp
(
fields
[
i
].
name
,
split
)
==
0
)
{
colIdx
=
i
;
break
;
}
}
// set default with last fields if
if
(
colIdx
==
-
1
)
{
colIdx
=
fieldsNum
-
1
;
}
}
// save rows
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
TAOS_ROW
row
=
taos_fetch_row
(
res
);
if
(
row
!=
NULL
)
{
tscDebug
(
"0x%"
PRIx64
" stream:%p fetch result
"
,
pSql
->
self
,
pStream
);
tscDebug
(
"0x%"
PRIx64
" stream:%p fetch result
row=%d"
,
pSql
->
self
,
pStream
,
i
);
tscStreamFillTimeGap
(
pStream
,
*
(
TSKEY
*
)
row
[
0
]);
pStream
->
stime
=
*
(
TSKEY
*
)
row
[
0
];
// user callback function
// write to another table if true
if
(
toAnother
)
{
tbHashAdd
(
tbHash
,
row
,
fields
,
colIdx
,
dstColsNum
);
if
(
i
==
numOfRows
-
1
)
//write last row to record last query time avoid query from begin for each
(
*
pStream
->
fp
)(
pStream
->
param
,
res
,
row
);
}
else
{
(
*
pStream
->
fp
)(
pStream
->
param
,
res
,
row
);
}
pStream
->
numOfRes
++
;
}
}
// write Another
if
(
toAnother
)
{
toAnotherTable
(
pSql
->
pTscObj
,
pStream
->
to
,
fields
,
dstColsNum
,
tbHash
,
numOfRows
);
taosHashCleanup
(
tbHash
);
}
if
(
!
pStream
->
isProject
)
{
pStream
->
stime
=
taosTimeAdd
(
pStream
->
stime
,
pStream
->
interval
.
sliding
,
pStream
->
interval
.
slidingUnit
,
pStream
->
precision
);
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
pQueryInfo
&&
pQueryInfo
->
pQInfo
)
code
=
pQueryInfo
->
pQInfo
->
code
;
// actually only one row is returned. this following is not necessary
if
(
code
==
TSDB_CODE_SUCCESS
)
{
taos_fetch_rows_a
(
res
,
tscProcessStreamRetrieveResult
,
pStream
);
}
}
else
{
// numOfRows == 0, all data has been retrieved
pStream
->
useconds
+=
pSql
->
res
.
useconds
;
if
(
pStream
->
numOfRes
==
0
)
{
...
...
@@ -313,7 +592,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
// todo set retry dynamic time
int32_t
retry
=
tsProjectExecInterval
;
tscError
(
"0x%"
PRIx64
" stream:%p, retrieve no data, code:0x%08x, retry in %"
PRId32
"ms"
,
pSql
->
self
,
pStream
,
numOfRows
,
retry
);
tscSetRetryTimer
(
pStream
,
pStream
->
pSql
,
retry
);
return
;
}
...
...
@@ -378,28 +656,33 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
taosTmrReset
(
tscProcessStreamTimer
,
(
int32_t
)
timer
,
pStream
,
tscTmr
,
&
pStream
->
pTimer
);
}
// get need delay time for every launch to exeucte query, include first and next launch
static
int64_t
getLaunchTimeDelay
(
const
SSqlStream
*
pStream
)
{
// step 1 read setting delay time in taos.cfg
int64_t
maxDelay
=
convertTimePrecision
(
tsMaxStreamComputDelay
,
TSDB_TIME_PRECISION_MILLI
,
pStream
->
precision
);
int64_t
delayDelta
=
maxDelay
;
int64_t
ratioDelay
=
maxDelay
;
if
(
pStream
->
interval
.
intervalUnit
!=
'n'
&&
pStream
->
interval
.
intervalUnit
!=
'y'
)
{
delayDelta
=
(
int64_t
)(
pStream
->
interval
.
sliding
*
tsStreamComputDelayRatio
);
if
(
delayDelta
>
maxDelay
)
{
delayDelta
=
maxDelay
;
ratioDelay
=
(
int64_t
)(
pStream
->
interval
.
sliding
*
tsStreamComputDelayRatio
);
if
(
ratioDelay
>
maxDelay
)
{
ratioDelay
=
maxDelay
;
}
int64_t
remainTimeWindow
=
pStream
->
interval
.
sliding
-
delayDelta
;
int64_t
remainTimeWindow
=
pStream
->
interval
.
sliding
-
ratioDelay
;
if
(
maxDelay
>
remainTimeWindow
)
{
maxDelay
=
(
int64_t
)(
remainTimeWindow
/
1
.
5
f
);
}
}
int64_t
currentDelay
=
(
rand
()
%
maxDelay
);
// a random number
currentDelay
+=
delayDelta
;
// PART 2 calc allDelay = rand delay + fixed delay
int64_t
allDelay
=
(
rand
()
%
maxDelay
)
+
ratioDelay
;
if
(
pStream
->
interval
.
intervalUnit
!=
'n'
&&
pStream
->
interval
.
intervalUnit
!=
'y'
)
{
assert
(
currentDelay
<
pStream
->
interval
.
sliding
);
if
(
allDelay
>=
pStream
->
interval
.
sliding
)
{
tscWarn
(
"CQ delay >= sliding error. delay=%"
PRId64
" sliding=%"
PRId64
". so set delay=sliding/2."
,
allDelay
,
pStream
->
interval
.
sliding
);
allDelay
=
pStream
->
interval
.
sliding
/
2
;
}
}
return
currentDelay
;
tscDebug
(
"getLanchDelay allDelay=%"
PRId64
"(ratioDelay=%"
PRId64
")"
,
allDelay
,
ratioDelay
);
return
allDelay
;
}
...
...
@@ -424,8 +707,8 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
return
;
}
}
else
{
int64_t
stime
=
taosTimeTruncate
(
pStream
->
stime
-
1
,
&
pStream
->
interval
,
pStream
->
precision
);
if
(
stime
>=
pStream
->
etime
)
{
int64_t
tsc_
stime
=
taosTimeTruncate
(
pStream
->
stime
-
1
,
&
pStream
->
interval
,
pStream
->
precision
);
if
(
tsc_
stime
>=
pStream
->
etime
)
{
tscDebug
(
"0x%"
PRIx64
" stream:%p, stime:%"
PRId64
" is larger than end time: %"
PRId64
", stop the stream"
,
pStream
->
pSql
->
self
,
pStream
,
pStream
->
stime
,
pStream
->
etime
);
// TODO : How to terminate stream here
...
...
@@ -448,7 +731,6 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
timer
+=
getLaunchTimeDelay
(
pStream
);
timer
=
convertTimePrecision
(
timer
,
pStream
->
precision
,
TSDB_TIME_PRECISION_MILLI
);
tscSetRetryTimer
(
pStream
,
pSql
,
timer
);
}
...
...
@@ -505,7 +787,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
return
TSDB_CODE_SUCCESS
;
}
static
int64_t
tscGetStreamStartTimestamp
(
SSqlObj
*
pSql
,
SSqlStream
*
pStream
,
int64_t
stime
)
{
static
int64_t
tscGetStreamStartTimestamp
(
SSqlObj
*
pSql
,
SSqlStream
*
pStream
,
int64_t
tsc_
stime
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
);
if
(
pStream
->
isProject
)
{
...
...
@@ -513,43 +795,45 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
pStream
->
interval
.
interval
=
tsProjectExecInterval
;
pStream
->
interval
.
sliding
=
tsProjectExecInterval
;
if
(
stime
!=
INT64_MIN
)
{
// first projection start from the latest event timestamp
assert
(
stime
>=
pQueryInfo
->
window
.
skey
);
stime
+=
1
;
// exclude the last records from table
if
(
tsc_
stime
!=
INT64_MIN
)
{
// first projection start from the latest event timestamp
assert
(
tsc_
stime
>=
pQueryInfo
->
window
.
skey
);
tsc_
stime
+=
1
;
// exclude the last records from table
}
else
{
stime
=
pQueryInfo
->
window
.
skey
;
tsc_
stime
=
pQueryInfo
->
window
.
skey
;
}
}
else
{
// timewindow based aggregation stream
if
(
stime
==
INT64_MIN
)
{
// no data in meter till now
if
(
tsc_
stime
==
INT64_MIN
)
{
// no data in meter till now
if
(
pQueryInfo
->
window
.
skey
!=
INT64_MIN
)
{
stime
=
pQueryInfo
->
window
.
skey
;
tsc_
stime
=
pQueryInfo
->
window
.
skey
;
}
else
{
return
stime
;
return
tsc_
stime
;
}
stime
=
taosTimeTruncate
(
stime
,
&
pStream
->
interval
,
pStream
->
precision
);
tsc_stime
=
taosTimeTruncate
(
tsc_
stime
,
&
pStream
->
interval
,
pStream
->
precision
);
}
else
{
int64_t
newStime
=
taosTimeTruncate
(
stime
,
&
pStream
->
interval
,
pStream
->
precision
);
if
(
newStime
!=
stime
)
{
tscWarn
(
"0x%"
PRIx64
" stream:%p, last timestamp:%"
PRId64
", reset to:%"
PRId64
,
pSql
->
self
,
pStream
,
stime
,
newStime
);
stime
=
newStime
;
int64_t
newStime
=
taosTimeTruncate
(
tsc_
stime
,
&
pStream
->
interval
,
pStream
->
precision
);
if
(
newStime
!=
tsc_
stime
)
{
tscWarn
(
"0x%"
PRIx64
" stream:%p, last timestamp:%"
PRId64
", reset to:%"
PRId64
,
pSql
->
self
,
pStream
,
tsc_
stime
,
newStime
);
tsc_
stime
=
newStime
;
}
}
}
return
stime
;
return
tsc_
stime
;
}
static
int64_t
tscGetLaunchTimestamp
(
const
SSqlStream
*
pStream
)
{
static
int64_t
tscGetFirstLaunchTime
(
const
SSqlStream
*
pStream
)
{
// PART 1 now to stime span
int64_t
timer
=
0
,
now
=
taosGetTimestamp
(
pStream
->
precision
);
if
(
pStream
->
stime
>
now
)
{
timer
=
pStream
->
stime
-
now
;
}
int64_t
startDelay
=
convertTimePrecision
(
tsStreamCompStartDelay
,
TSDB_TIME_PRECISION_MILLI
,
pStream
->
precision
);
// PART 2 stream first Launch need delay, setting with taos.cfg
timer
+=
convertTimePrecision
(
tsFirstLaunchDelay
,
TSDB_TIME_PRECISION_MILLI
,
pStream
->
precision
);
// PART 3 every launch need delay, include first and next launch
timer
+=
getLaunchTimeDelay
(
pStream
);
timer
+=
startDelay
;
return
convertTimePrecision
(
timer
,
pStream
->
precision
,
TSDB_TIME_PRECISION_MILLI
);
}
...
...
@@ -562,6 +846,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pSql
->
res
.
code
=
code
;
tscError
(
"0x%"
PRIx64
" open stream failed, sql:%s, reason:%s, code:%s"
,
pSql
->
self
,
pSql
->
sqlstr
,
pCmd
->
payload
,
tstrerror
(
code
));
pStream
->
fp
(
pStream
->
param
,
NULL
,
NULL
);
return
;
}
...
...
@@ -588,17 +873,17 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
// set stime with ltime if ltime > stime
const
char
*
dstTable
=
pStream
->
dstTable
?
pStream
->
dstTable
:
""
;
tscDebug
(
"0x%"
PRIx64
" CQ table %s ltime is %"
PRId64
,
pSql
->
self
,
dstTable
,
pStream
->
ltime
);
tscDebug
(
" CQ table=%s ltime is %"
PRId64
,
dstTable
,
pStream
->
ltime
);
if
(
pStream
->
ltime
!=
INT64_MIN
&&
pStream
->
ltime
>
pStream
->
stime
)
{
tscWarn
(
"
0x%"
PRIx64
" CQ set stream %s stime=%"
PRId64
" replace with ltime=%"
PRId64
" if ltime > 0"
,
pSql
->
self
,
dstTable
,
pStream
->
stime
,
pStream
->
ltime
);
tscWarn
(
"
CQ set stream %s stime=%"
PRId64
" replace with ltime=%"
PRId64
" if ltime>0 "
,
dstTable
,
pStream
->
stime
,
pStream
->
ltime
);
pStream
->
stime
=
pStream
->
ltime
;
}
int64_t
starttime
=
tscGet
LaunchTimestamp
(
pStream
);
int64_t
starttime
=
tscGet
FirstLaunchTime
(
pStream
);
pCmd
->
command
=
TSDB_SQL_SELECT
;
tscAddIntoStreamList
(
pStream
);
taosTmrReset
(
tscProcessStreamTimer
,
(
int32_t
)
starttime
,
pStream
,
tscTmr
,
&
pStream
->
pTimer
);
tscDebug
(
"0x%"
PRIx64
" stream:%p is opened, query on:%s, interval:%"
PRId64
", sliding:%"
PRId64
", first launched in:%"
PRId64
", sql:%s"
,
pSql
->
self
,
...
...
@@ -665,10 +950,54 @@ void cbParseSql(void* param, TAOS_RES* res, int code) {
char
sql
[
128
]
=
""
;
sprintf
(
sql
,
"select last_row(*) from %s;"
,
pStream
->
dstTable
);
taos_query_a
(
pSql
->
pTscObj
,
sql
,
fpStreamLastRow
,
param
);
return
;
}
TAOS_STREAM
*
taos_open_stream_withname
(
TAOS
*
taos
,
const
char
*
dstTable
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
,
TAOS_RES
*
,
TAOS_ROW
),
int64_t
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
),
void
*
cqhandle
)
{
void
splitStreamSql
(
const
char
*
str
,
char
**
sql
,
char
**
to
,
char
**
split
)
{
// OLD FORMAT only sql
if
(
strncmp
(
str
,
LABEL_SQL
,
LABEL_SQL_LEN
)
!=
0
)
{
*
sql
=
tmalloc
(
strlen
(
str
)
+
1
);
if
(
*
sql
==
NULL
)
return
;
strcpy
(
*
sql
,
str
);
return
;
}
// NEW FORMAT sql:...to:...split:...
char
*
p1
=
strstr
(
str
+
LABEL_SQL_LEN
,
LABEL_TO
);
if
(
p1
==
NULL
)
{
char
*
p
=
(
char
*
)
str
+
LABEL_SQL_LEN
;
*
sql
=
(
char
*
)
tmalloc
(
strlen
(
p
)
+
1
);
strcpy
(
*
sql
,
p
);
return
;
}
// SQL value
int32_t
len
=
(
int32_t
)(
p1
-
str
-
LABEL_SQL_LEN
);
*
sql
=
(
char
*
)
tmalloc
(
len
+
1
);
strncpy
(
*
sql
,
str
+
LABEL_SQL_LEN
,
len
);
(
*
sql
)[
len
]
=
0
;
// str end
// TO value
char
*
p2
=
strstr
(
p1
+
LABEL_TO_LEN
,
LABEL_SPLIT
);
if
(
p2
==
NULL
)
{
char
*
p
=
p1
+
LABEL_TO_LEN
;
*
to
=
(
char
*
)
tmalloc
(
strlen
(
p
)
+
1
);
strcpy
(
*
to
,
p
);
return
;
}
len
=
(
int32_t
)(
p2
-
p1
-
LABEL_TO_LEN
);
*
to
=
(
char
*
)
tmalloc
(
len
+
1
);
strncpy
(
*
to
,
p1
+
LABEL_TO_LEN
,
len
);
(
*
to
)[
len
]
=
0
;
// str end
// SPLIT value
char
*
p
=
p2
+
LABEL_SPLIT_LEN
;
*
split
=
(
char
*
)
tmalloc
(
strlen
(
p
)
+
1
);
strcpy
(
*
split
,
p
);
}
TAOS_STREAM
*
taos_open_stream_withname
(
TAOS
*
taos
,
const
char
*
dstTable
,
int32_t
dstCols
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
int64_t
tsc_stime
,
void
*
param
,
void
(
*
callback
)(
void
*
),
void
*
cqhandle
)
{
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
return
NULL
;
...
...
@@ -697,19 +1026,26 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
}
pStream
->
ltime
=
INT64_MIN
;
pStream
->
stime
=
stime
;
pStream
->
stime
=
tsc_
stime
;
pStream
->
fp
=
fp
;
pStream
->
callback
=
callback
;
pStream
->
param
=
param
;
pStream
->
pSql
=
pSql
;
pStream
->
cqhandle
=
cqhandle
;
pStream
->
dstCols
=
dstCols
;
pStream
->
to
=
NULL
;
pStream
->
split
=
NULL
;
pSql
->
pStream
=
pStream
;
pSql
->
param
=
pStream
;
pSql
->
maxRetry
=
TSDB_MAX_REPLICA
;
tscSetStreamDestTable
(
pStream
,
dstTable
);
pSql
->
pStream
=
pStream
;
pSql
->
param
=
pStream
;
pSql
->
maxRetry
=
TSDB_MAX_REPLICA
;
pSql
->
sqlstr
=
calloc
(
1
,
strlen
(
sqlstr
)
+
1
);
// split stream sqlstr to sql,to,split
splitStreamSql
(
sqlstr
,
&
pSql
->
sqlstr
,
&
pStream
->
to
,
&
pStream
->
split
);
if
(
pSql
->
sqlstr
==
NULL
)
{
tscError
(
"0x%"
PRIx64
" failed to malloc sql string buffer"
,
pSql
->
self
);
tscFreeSqlObj
(
pSql
);
...
...
@@ -717,7 +1053,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
return
NULL
;
}
strtolower
(
pSql
->
sqlstr
,
sqlstr
);
strtolower
(
pSql
->
sqlstr
,
pSql
->
sqlstr
);
pSql
->
fp
=
tscCreateStream
;
pSql
->
fetchFp
=
tscCreateStream
;
pSql
->
cmd
.
resColumnId
=
TSDB_RES_COL_ID
;
...
...
@@ -729,13 +1065,14 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
pSql
->
fp
=
cbParseSql
;
pSql
->
fetchFp
=
cbParseSql
;
registerSqlObj
(
pSql
);
int32_t
code
=
tsParseSql
(
pSql
,
true
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
cbParseSql
(
pStream
,
pSql
,
code
);
}
else
if
(
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
tscDebug
(
"
0x%"
PRIx64
" CQ taso_open_stream IN Process"
,
pSql
->
self
);
tscDebug
(
"
CQ taos_open_stream IN Process. sql=%s"
,
sqlstr
);
}
else
{
tscError
(
"0x%"
PRIx64
" open stream failed, sql:%s, code:%s"
,
pSql
->
self
,
sqlstr
,
tstrerror
(
code
));
taosReleaseRef
(
tscObjRef
,
pSql
->
self
);
...
...
@@ -746,9 +1083,9 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
return
pStream
;
}
TAOS_STREAM
*
taos_open_stream
(
TAOS
*
taos
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
,
TAOS_RES
*
,
TAOS_ROW
),
int64_t
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
))
{
return
taos_open_stream_withname
(
taos
,
""
,
sqlstr
,
fp
,
stime
,
param
,
callback
,
NULL
);
TAOS_STREAM
*
taos_open_stream
(
TAOS
*
taos
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
int64_t
tsc_
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
))
{
return
taos_open_stream_withname
(
taos
,
""
,
-
1
,
sqlstr
,
fp
,
tsc_
stime
,
param
,
callback
,
NULL
);
}
void
taos_close_stream
(
TAOS_STREAM
*
handle
)
{
...
...
@@ -774,6 +1111,16 @@ void taos_close_stream(TAOS_STREAM *handle) {
pStream
->
fp
(
pStream
->
param
,
NULL
,
NULL
);
taos_free_result
(
pSql
);
// free malloc
if
(
pStream
->
to
)
{
tfree
(
pStream
->
to
);
pStream
->
to
=
NULL
;
}
if
(
pStream
->
split
)
{
tfree
(
pStream
->
split
);
pStream
->
split
=
NULL
;
}
tfree
(
pStream
);
}
}
src/common/inc/tglobal.h
浏览文件 @
f1c1501b
...
...
@@ -83,7 +83,7 @@ extern int32_t tsMaxNumOfOrderedResults;
extern
int32_t
tsMinSlidingTime
;
extern
int32_t
tsMinIntervalTime
;
extern
int32_t
tsMaxStreamComputDelay
;
extern
int32_t
ts
StreamCompStart
Delay
;
extern
int32_t
ts
FirstLaunch
Delay
;
extern
int32_t
tsRetryStreamCompDelay
;
extern
float
tsStreamComputDelayRatio
;
// the delayed computing ration of the whole time window
extern
int32_t
tsProjectExecInterval
;
...
...
src/common/src/tglobal.c
浏览文件 @
f1c1501b
...
...
@@ -107,11 +107,11 @@ int32_t tsMinIntervalTime = 1;
// 20sec, the maximum value of stream computing delay, changed accordingly
int32_t
tsMaxStreamComputDelay
=
20000
;
// 10sec, the
first stream computing
delay time after system launched successfully, changed accordingly
int32_t
ts
StreamCompStart
Delay
=
10000
;
// 10sec, the
stream first launched to execute
delay time after system launched successfully, changed accordingly
int32_t
ts
FirstLaunch
Delay
=
10000
;
// the stream computing delay time after executing failed, change accordingly
int32_t
tsRetryStreamCompDelay
=
1
0
*
1000
;
int32_t
tsRetryStreamCompDelay
=
30
*
6
0
*
1000
;
// The delayed computing ration. 10% of the whole computing time window by default.
float
tsStreamComputDelayRatio
=
0
.
1
f
;
...
...
@@ -798,7 +798,7 @@ static void doInitGlobalConfig(void) {
taosInitConfigOption
(
cfg
);
cfg
.
option
=
"maxFirstStreamCompDelay"
;
cfg
.
ptr
=
&
ts
StreamCompStart
Delay
;
cfg
.
ptr
=
&
ts
FirstLaunch
Delay
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT32
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
1000
;
...
...
src/cq/src/cqMain.c
浏览文件 @
f1c1501b
...
...
@@ -423,7 +423,7 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
}
// inner implement in tscStream.c
TAOS_STREAM
*
taos_open_stream_withname
(
TAOS
*
taos
,
const
char
*
desName
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
TAOS_STREAM
*
taos_open_stream_withname
(
TAOS
*
taos
,
const
char
*
desName
,
int32_t
dstCols
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
int64_t
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
),
void
*
cqhandle
);
static
void
cqCreateStream
(
SCqContext
*
pContext
,
SCqObj
*
pObj
)
{
...
...
@@ -436,9 +436,11 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
}
pObj
->
tmrId
=
0
;
int32_t
dstCols
=
-
1
;
if
(
pObj
->
pSchema
)
dstCols
=
pObj
->
pSchema
->
numOfCols
;
if
(
pObj
->
pStream
==
NULL
)
{
pObj
->
pStream
=
taos_open_stream_withname
(
pContext
->
dbConn
,
pObj
->
dstTable
,
pObj
->
sqlStr
,
cqProcessStreamRes
,
\
pObj
->
pStream
=
taos_open_stream_withname
(
pContext
->
dbConn
,
pObj
->
dstTable
,
dstCols
,
pObj
->
sqlStr
,
cqProcessStreamRes
,
\
INT64_MIN
,
(
void
*
)
pObj
->
rid
,
NULL
,
pContext
);
// TODO the pObj->pStream may be released if error happens
...
...
src/inc/taos.h
浏览文件 @
f1c1501b
...
...
@@ -173,7 +173,11 @@ DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT
int
taos_affected_rows
(
TAOS_RES
*
res
);
DLL_EXPORT
TAOS_FIELD
*
taos_fetch_fields
(
TAOS_RES
*
res
);
DLL_EXPORT
int
taos_select_db
(
TAOS
*
taos
,
const
char
*
db
);
// row to string
DLL_EXPORT
int
taos_print_row
(
char
*
str
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int
num_fields
);
DLL_EXPORT
int
taos_print_row_ex
(
char
*
str
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int
num_fields
,
char
split
,
bool
addQuota
);
// one field to string
DLL_EXPORT
int
taos_print_field
(
char
*
str
,
void
*
value
,
TAOS_FIELD
*
field
);
DLL_EXPORT
void
taos_stop_query
(
TAOS_RES
*
res
);
DLL_EXPORT
bool
taos_is_null
(
TAOS_RES
*
res
,
int32_t
row
,
int32_t
col
);
DLL_EXPORT
bool
taos_is_update_query
(
TAOS_RES
*
res
);
...
...
src/inc/trpc.h
浏览文件 @
f1c1501b
...
...
@@ -92,6 +92,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void
rpcSendRecv
(
void
*
shandle
,
SRpcEpSet
*
pEpSet
,
SRpcMsg
*
pReq
,
SRpcMsg
*
pRsp
);
int
rpcReportProgress
(
void
*
pConn
,
char
*
pCont
,
int
contLen
);
void
rpcCancelRequest
(
int64_t
rid
);
int32_t
rpcUnusedSession
(
void
*
rpcInfo
,
bool
bLock
);
#ifdef __cplusplus
}
...
...
src/inc/ttokendef.h
浏览文件 @
f1c1501b
...
...
@@ -136,86 +136,88 @@
#define TK_UNSIGNED 118
#define TK_TAGS 119
#define TK_USING 120
#define TK_NULL 121
#define TK_NOW 122
#define TK_VARIABLE 123
#define TK_SELECT 124
#define TK_UNION 125
#define TK_ALL 126
#define TK_DISTINCT 127
#define TK_FROM 128
#define TK_RANGE 129
#define TK_INTERVAL 130
#define TK_EVERY 131
#define TK_SESSION 132
#define TK_STATE_WINDOW 133
#define TK_FILL 134
#define TK_SLIDING 135
#define TK_ORDER 136
#define TK_BY 137
#define TK_ASC 138
#define TK_GROUP 139
#define TK_HAVING 140
#define TK_LIMIT 141
#define TK_OFFSET 142
#define TK_SLIMIT 143
#define TK_SOFFSET 144
#define TK_WHERE 145
#define TK_RESET 146
#define TK_QUERY 147
#define TK_SYNCDB 148
#define TK_ADD 149
#define TK_COLUMN 150
#define TK_MODIFY 151
#define TK_TAG 152
#define TK_CHANGE 153
#define TK_SET 154
#define TK_KILL 155
#define TK_CONNECTION 156
#define TK_STREAM 157
#define TK_COLON 158
#define TK_ABORT 159
#define TK_AFTER 160
#define TK_ATTACH 161
#define TK_BEFORE 162
#define TK_BEGIN 163
#define TK_CASCADE 164
#define TK_CLUSTER 165
#define TK_CONFLICT 166
#define TK_COPY 167
#define TK_DEFERRED 168
#define TK_DELIMITERS 169
#define TK_DETACH 170
#define TK_EACH 171
#define TK_END 172
#define TK_EXPLAIN 173
#define TK_FAIL 174
#define TK_FOR 175
#define TK_IGNORE 176
#define TK_IMMEDIATE 177
#define TK_INITIALLY 178
#define TK_INSTEAD 179
#define TK_KEY 180
#define TK_OF 181
#define TK_RAISE 182
#define TK_REPLACE 183
#define TK_RESTRICT 184
#define TK_ROW 185
#define TK_STATEMENT 186
#define TK_TRIGGER 187
#define TK_VIEW 188
#define TK_IPTOKEN 189
#define TK_SEMI 190
#define TK_NONE 191
#define TK_PREV 192
#define TK_LINEAR 193
#define TK_IMPORT 194
#define TK_TBNAME 195
#define TK_JOIN 196
#define TK_INSERT 197
#define TK_INTO 198
#define TK_VALUES 199
#define TK_FILE 200
#define TK_TO 121
#define TK_SPLIT 122
#define TK_NULL 123
#define TK_NOW 124
#define TK_VARIABLE 125
#define TK_SELECT 126
#define TK_UNION 127
#define TK_ALL 128
#define TK_DISTINCT 129
#define TK_FROM 130
#define TK_RANGE 131
#define TK_INTERVAL 132
#define TK_EVERY 133
#define TK_SESSION 134
#define TK_STATE_WINDOW 135
#define TK_FILL 136
#define TK_SLIDING 137
#define TK_ORDER 138
#define TK_BY 139
#define TK_ASC 140
#define TK_GROUP 141
#define TK_HAVING 142
#define TK_LIMIT 143
#define TK_OFFSET 144
#define TK_SLIMIT 145
#define TK_SOFFSET 146
#define TK_WHERE 147
#define TK_RESET 148
#define TK_QUERY 149
#define TK_SYNCDB 150
#define TK_ADD 151
#define TK_COLUMN 152
#define TK_MODIFY 153
#define TK_TAG 154
#define TK_CHANGE 155
#define TK_SET 156
#define TK_KILL 157
#define TK_CONNECTION 158
#define TK_STREAM 159
#define TK_COLON 160
#define TK_ABORT 161
#define TK_AFTER 162
#define TK_ATTACH 163
#define TK_BEFORE 164
#define TK_BEGIN 165
#define TK_CASCADE 166
#define TK_CLUSTER 167
#define TK_CONFLICT 168
#define TK_COPY 169
#define TK_DEFERRED 170
#define TK_DELIMITERS 171
#define TK_DETACH 172
#define TK_EACH 173
#define TK_END 174
#define TK_EXPLAIN 175
#define TK_FAIL 176
#define TK_FOR 177
#define TK_IGNORE 178
#define TK_IMMEDIATE 179
#define TK_INITIALLY 180
#define TK_INSTEAD 181
#define TK_KEY 182
#define TK_OF 183
#define TK_RAISE 184
#define TK_REPLACE 185
#define TK_RESTRICT 186
#define TK_ROW 187
#define TK_STATEMENT 188
#define TK_TRIGGER 189
#define TK_VIEW 190
#define TK_IPTOKEN 191
#define TK_SEMI 192
#define TK_NONE 193
#define TK_PREV 194
#define TK_LINEAR 195
#define TK_IMPORT 196
#define TK_TBNAME 197
#define TK_JOIN 198
#define TK_INSERT 199
#define TK_INTO 200
#define TK_VALUES 201
#define TK_FILE 202
#define TK_SPACE 300
...
...
src/mnode/src/mnodeTable.c
浏览文件 @
f1c1501b
...
...
@@ -851,7 +851,7 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
memcpy
(
pSubMsg
->
pCont
+
sizeof
(
SCMCreateTableMsg
),
p
,
htonl
(
p
->
len
));
code
=
mnodeValidateCreateTableMsg
(
p
,
pSubMsg
);
if
(
code
==
TSDB_CODE_SUCCESS
||
code
==
TSDB_CODE_MND_TABLE_ALREADY_EXIST
)
{
if
(
code
==
TSDB_CODE_SUCCESS
||
(
p
->
igExists
==
1
&&
code
==
TSDB_CODE_MND_TABLE_ALREADY_EXIST
)
)
{
++
pSubMsg
->
pBatchMasterMsg
->
successed
;
mnodeDestroySubMsg
(
pSubMsg
);
continue
;
...
...
@@ -1046,11 +1046,21 @@ static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
if
(
code
==
TSDB_CODE_SUCCESS
)
{
mLInfo
(
"stable:%s, is created in sdb, uid:%"
PRIu64
,
pTable
->
info
.
tableId
,
pTable
->
uid
);
if
(
pMsg
->
pBatchMasterMsg
)
pMsg
->
pBatchMasterMsg
->
successed
++
;
}
else
{
mError
(
"msg:%p, app:%p stable:%s, failed to create in sdb, reason:%s"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
pTable
->
info
.
tableId
,
tstrerror
(
code
));
SSdbRow
desc
=
{.
type
=
SDB_OPER_GLOBAL
,
.
pObj
=
pTable
,
.
pTable
=
tsSuperTableSdb
};
sdbDeleteRow
(
&
desc
);
if
(
pMsg
->
pBatchMasterMsg
)
pMsg
->
pBatchMasterMsg
->
received
++
;
}
// if super table create by batch msg, check done and send finished to client
if
(
pMsg
->
pBatchMasterMsg
)
{
if
(
pMsg
->
pBatchMasterMsg
->
successed
+
pMsg
->
pBatchMasterMsg
->
received
>=
pMsg
->
pBatchMasterMsg
->
expected
)
dnodeSendRpcMWriteRsp
(
pMsg
->
pBatchMasterMsg
,
code
);
}
return
code
;
...
...
src/query/inc/qSqlparser.h
浏览文件 @
f1c1501b
...
...
@@ -25,6 +25,7 @@ extern "C" {
#include "tstrbuild.h"
#include "ttoken.h"
#include "tvariant.h"
#include "tname.h"
#define ParseTOKENTYPE SStrToken
...
...
@@ -156,8 +157,11 @@ typedef struct SCreatedTableInfo {
typedef
struct
SCreateTableSql
{
SStrToken
name
;
// table name, create table [name] xxx
SStrToken
to
;
// create stream to anohter table
SStrToken
split
;
// split columns
int8_t
type
;
// create normal table/from super table/ stream
bool
existCheck
;
SName
toSName
;
struct
{
SArray
*
pTagColumns
;
// SArray<TAOS_FIELD>
...
...
@@ -334,6 +338,7 @@ SArray *setSubclause(SArray *pList, void *pSqlNode);
SArray
*
appendSelectClause
(
SArray
*
pList
,
void
*
pSubclause
);
void
setCreatedTableName
(
SSqlInfo
*
pInfo
,
SStrToken
*
pTableNameToken
,
SStrToken
*
pIfNotExists
);
void
setCreatedStreamOpt
(
SSqlInfo
*
pInfo
,
SStrToken
*
pTo
,
SStrToken
*
pSplit
);
void
SqlInfoDestroy
(
SSqlInfo
*
pInfo
);
...
...
src/query/inc/sql.y
浏览文件 @
f1c1501b
...
...
@@ -411,14 +411,28 @@ tagNamelist(A) ::= ids(X). {A = taosArrayInit(4, sizeof(SSt
// create stream
// create table table_name as select count(*) from super_table_name interval(time)
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) AS select(S). {
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z)
to_opt(E) split_opt(F)
AS select(S). {
A = tSetCreateTableInfo(NULL, NULL, S, TSQL_CREATE_STREAM);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
setCreatedStreamOpt(pInfo, &E, &F);
V.n += Z.n;
setCreatedTableName(pInfo, &V, &U);
}
// to_opt
%type to_opt {SStrToken}
to_opt(A) ::= . {A.n = 0;}
to_opt(A) ::= TO ids(X) cpxName(Y). {
A = X;
A.n += Y.n;
}
// split_opt
%type to_split {SStrToken}
split_opt(A) ::= . {A.n = 0;}
split_opt(A) ::= SPLIT ids(X). { A = X;}
%type column{TAOS_FIELD}
%type columnlist{SArray*}
%destructor columnlist {taosArrayDestroy(&$$);}
...
...
src/query/src/qSqlParser.c
浏览文件 @
f1c1501b
...
...
@@ -1244,6 +1244,11 @@ void setCreatedTableName(SSqlInfo *pInfo, SStrToken *pTableNameToken, SStrToken
pInfo
->
pCreateTableInfo
->
existCheck
=
(
pIfNotExists
->
n
!=
0
);
}
void
setCreatedStreamOpt
(
SSqlInfo
*
pInfo
,
SStrToken
*
pTo
,
SStrToken
*
pSplit
)
{
pInfo
->
pCreateTableInfo
->
to
=
*
pTo
;
pInfo
->
pCreateTableInfo
->
split
=
*
pSplit
;
}
void
setDCLSqlElems
(
SSqlInfo
*
pInfo
,
int32_t
type
,
int32_t
nParam
,
...)
{
pInfo
->
type
=
type
;
if
(
nParam
==
0
)
{
...
...
src/query/src/queryMain.c
浏览文件 @
f1c1501b
...
...
@@ -225,51 +225,6 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
return
code
;
}
#ifdef TEST_IMPL
// wait moment
int
waitMoment
(
SQInfo
*
pQInfo
){
if
(
pQInfo
->
sql
)
{
int
ms
=
0
;
char
*
pcnt
=
strstr
(
pQInfo
->
sql
,
" count(*)"
);
if
(
pcnt
)
return
0
;
char
*
pos
=
strstr
(
pQInfo
->
sql
,
" t_"
);
if
(
pos
){
pos
+=
3
;
ms
=
atoi
(
pos
);
while
(
*
pos
>=
'0'
&&
*
pos
<=
'9'
){
pos
++
;
}
char
unit_char
=
*
pos
;
if
(
unit_char
==
'h'
){
ms
*=
3600
*
1000
;
}
else
if
(
unit_char
==
'm'
){
ms
*=
60
*
1000
;
}
else
if
(
unit_char
==
's'
){
ms
*=
1000
;
}
}
if
(
ms
==
0
)
return
0
;
printf
(
"test wait sleep %dms. sql=%s ...
\n
"
,
ms
,
pQInfo
->
sql
);
if
(
ms
<
1000
)
{
taosMsleep
(
ms
);
}
else
{
int
used_ms
=
0
;
while
(
used_ms
<
ms
)
{
taosMsleep
(
1000
);
used_ms
+=
1000
;
if
(
isQueryKilled
(
pQInfo
)){
printf
(
"test check query is canceled, sleep break.%s
\n
"
,
pQInfo
->
sql
);
break
;
}
}
}
}
return
1
;
}
#endif
bool
qTableQuery
(
qinfo_t
qinfo
,
uint64_t
*
qId
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
assert
(
pQInfo
&&
pQInfo
->
signature
==
pQInfo
);
...
...
src/query/src/sql.c
浏览文件 @
f1c1501b
因为 它太大了无法显示 source diff 。你可以改为
查看blob
。
src/rpc/src/rpcMain.c
浏览文件 @
f1c1501b
...
...
@@ -1677,3 +1677,9 @@ static void rpcDecRef(SRpcInfo *pRpc)
}
}
int32_t
rpcUnusedSession
(
void
*
rpcInfo
,
bool
bLock
)
{
SRpcInfo
*
info
=
(
SRpcInfo
*
)
rpcInfo
;
if
(
info
==
NULL
)
return
0
;
return
taosIdPoolNumOfFree
(
info
->
idPool
,
bLock
);
}
\ No newline at end of file
src/util/inc/tidpool.h
浏览文件 @
f1c1501b
...
...
@@ -36,6 +36,9 @@ int taosIdPoolNumOfUsed(void *handle);
bool
taosIdPoolMarkStatus
(
void
*
handle
,
int
id
);
// get free count from pool , if bLock is true, locked pool than get free count, accuracy but slowly
int
taosIdPoolNumOfFree
(
void
*
handle
,
bool
bLock
);
#ifdef __cplusplus
}
#endif
...
...
src/util/inc/ttoken.h
浏览文件 @
f1c1501b
...
...
@@ -63,6 +63,15 @@ uint32_t tGetToken(char *z, uint32_t *tokenType);
*/
SStrToken
tStrGetToken
(
char
*
str
,
int32_t
*
i
,
bool
isPrevOptr
);
/**
* strcpy implement source from SStrToken
*
* @param dst copy to
* @param srcToken copy from
* @return size of copy successful bytes, not include '\0'
*/
int32_t
tStrNCpy
(
char
*
dst
,
SStrToken
*
srcToken
);
/**
* check if it is a keyword or not
* @param z
...
...
src/util/src/tidpool.c
浏览文件 @
f1c1501b
...
...
@@ -164,3 +164,15 @@ int taosIdPoolMaxSize(void *handle) {
return
ret
;
}
// get free count from pool , if bLock is true, locked pool than get free count, accuracy but slowly
int
taosIdPoolNumOfFree
(
void
*
handle
,
bool
bLock
)
{
id_pool_t
*
pIdPool
=
handle
;
if
(
bLock
)
pthread_mutex_lock
(
&
pIdPool
->
mutex
);
int
ret
=
pIdPool
->
numOfFree
;
if
(
bLock
)
pthread_mutex_unlock
(
&
pIdPool
->
mutex
);
return
ret
;
}
\ No newline at end of file
src/util/src/ttokenizer.c
浏览文件 @
f1c1501b
...
...
@@ -231,7 +231,9 @@ static SKeyword keywordTable[] = {
{
"AGGREGATE"
,
TK_AGGREGATE
},
{
"BUFSIZE"
,
TK_BUFSIZE
},
{
"RANGE"
,
TK_RANGE
},
{
"CONTAINS"
,
TK_CONTAINS
}
{
"CONTAINS"
,
TK_CONTAINS
},
{
"TO"
,
TK_TO
},
{
"SPLIT"
,
TK_SPLIT
}
};
static
const
char
isIdChar
[]
=
{
...
...
@@ -704,6 +706,18 @@ SStrToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr) {
return
t0
;
}
/**
* strcpy implement source from SStrToken
*
* @param dst copy to
* @param srcToken copy from
* @return size of copy successful bytes
*/
int32_t
tStrNCpy
(
char
*
dst
,
SStrToken
*
srcToken
)
{
strncpy
(
dst
,
srcToken
->
z
,
srcToken
->
n
);
return
srcToken
->
n
;
}
bool
taosIsKeyWordToken
(
const
char
*
z
,
int32_t
len
)
{
return
(
tKeywordCode
((
char
*
)
z
,
len
)
!=
TK_ID
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录