Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ba06204d
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ba06204d
编写于
4月 20, 2022
作者:
C
cpwu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into cpwu/3.0
上级
a2ea0143
08f1980c
变更
17
显示空白变更内容
内联
并排
Showing
17 changed file
with
3602 addition
and
3229 deletion
+3602
-3229
include/common/tmsg.h
include/common/tmsg.h
+6
-0
include/common/ttokendef.h
include/common/ttokendef.h
+150
-149
include/libs/nodes/cmdnodes.h
include/libs/nodes/cmdnodes.h
+1
-0
include/util/tdef.h
include/util/tdef.h
+6
-2
source/common/src/tmsg.c
source/common/src/tmsg.c
+49
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+4
-0
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+25
-21
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+73
-1
source/libs/parser/inc/parAst.h
source/libs/parser/inc/parAst.h
+1
-0
source/libs/parser/inc/sql.y
source/libs/parser/inc/sql.y
+3
-1
source/libs/parser/src/parTokenizer.c
source/libs/parser/src/parTokenizer.c
+1
-0
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+350
-132
source/libs/parser/src/sql.c
source/libs/parser/src/sql.c
+2555
-2541
source/libs/scalar/src/sclfunc.c
source/libs/scalar/src/sclfunc.c
+33
-56
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-1
tests/script/tsim/tmq/consume.sh
tests/script/tsim/tmq/consume.sh
+103
-0
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+241
-325
未找到文件。
include/common/tmsg.h
浏览文件 @
ba06204d
...
...
@@ -280,10 +280,14 @@ typedef struct {
int32_t
numOfTags
;
int32_t
numOfSmas
;
int32_t
commentLen
;
int32_t
ast1Len
;
int32_t
ast2Len
;
SArray
*
pColumns
;
// array of SField
SArray
*
pTags
;
// array of SField
SArray
*
pSmas
;
// array of SField
char
*
comment
;
char
*
pAst1
;
char
*
pAst2
;
}
SMCreateStbReq
;
int32_t
tSerializeSMCreateStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMCreateStbReq
*
pReq
);
...
...
@@ -609,6 +613,8 @@ typedef struct {
int8_t
cacheLastRow
;
int8_t
streamMode
;
int8_t
singleSTable
;
int32_t
numOfRetensions
;
SArray
*
pRetensions
;
}
SDbCfgRsp
;
int32_t
tSerializeSDbCfgRsp
(
void
*
buf
,
int32_t
bufLen
,
const
SDbCfgRsp
*
pRsp
);
...
...
include/common/ttokendef.h
浏览文件 @
ba06204d
...
...
@@ -86,155 +86,156 @@
#define TK_SINGLE_STABLE 68
#define TK_STREAM_MODE 69
#define TK_RETENTIONS 70
#define TK_NK_COMMA 71
#define TK_NK_COLON 72
#define TK_TABLE 73
#define TK_NK_LP 74
#define TK_NK_RP 75
#define TK_STABLE 76
#define TK_ADD 77
#define TK_COLUMN 78
#define TK_MODIFY 79
#define TK_RENAME 80
#define TK_TAG 81
#define TK_SET 82
#define TK_NK_EQ 83
#define TK_USING 84
#define TK_TAGS 85
#define TK_NK_DOT 86
#define TK_COMMENT 87
#define TK_BOOL 88
#define TK_TINYINT 89
#define TK_SMALLINT 90
#define TK_INT 91
#define TK_INTEGER 92
#define TK_BIGINT 93
#define TK_FLOAT 94
#define TK_DOUBLE 95
#define TK_BINARY 96
#define TK_TIMESTAMP 97
#define TK_NCHAR 98
#define TK_UNSIGNED 99
#define TK_JSON 100
#define TK_VARCHAR 101
#define TK_MEDIUMBLOB 102
#define TK_BLOB 103
#define TK_VARBINARY 104
#define TK_DECIMAL 105
#define TK_SMA 106
#define TK_ROLLUP 107
#define TK_FILE_FACTOR 108
#define TK_NK_FLOAT 109
#define TK_DELAY 110
#define TK_SHOW 111
#define TK_DATABASES 112
#define TK_TABLES 113
#define TK_STABLES 114
#define TK_MNODES 115
#define TK_MODULES 116
#define TK_QNODES 117
#define TK_FUNCTIONS 118
#define TK_INDEXES 119
#define TK_FROM 120
#define TK_ACCOUNTS 121
#define TK_APPS 122
#define TK_CONNECTIONS 123
#define TK_LICENCE 124
#define TK_GRANTS 125
#define TK_QUERIES 126
#define TK_SCORES 127
#define TK_TOPICS 128
#define TK_VARIABLES 129
#define TK_BNODES 130
#define TK_SNODES 131
#define TK_LIKE 132
#define TK_INDEX 133
#define TK_FULLTEXT 134
#define TK_FUNCTION 135
#define TK_INTERVAL 136
#define TK_TOPIC 137
#define TK_AS 138
#define TK_DESC 139
#define TK_DESCRIBE 140
#define TK_RESET 141
#define TK_QUERY 142
#define TK_EXPLAIN 143
#define TK_ANALYZE 144
#define TK_VERBOSE 145
#define TK_NK_BOOL 146
#define TK_RATIO 147
#define TK_COMPACT 148
#define TK_VNODES 149
#define TK_IN 150
#define TK_OUTPUTTYPE 151
#define TK_AGGREGATE 152
#define TK_BUFSIZE 153
#define TK_STREAM 154
#define TK_INTO 155
#define TK_TRIGGER 156
#define TK_AT_ONCE 157
#define TK_WINDOW_CLOSE 158
#define TK_WATERMARK 159
#define TK_KILL 160
#define TK_CONNECTION 161
#define TK_MERGE 162
#define TK_VGROUP 163
#define TK_REDISTRIBUTE 164
#define TK_SPLIT 165
#define TK_SYNCDB 166
#define TK_NULL 167
#define TK_NK_QUESTION 168
#define TK_NK_ARROW 169
#define TK_ROWTS 170
#define TK_TBNAME 171
#define TK_QSTARTTS 172
#define TK_QENDTS 173
#define TK_WSTARTTS 174
#define TK_WENDTS 175
#define TK_WDURATION 176
#define TK_CAST 177
#define TK_NOW 178
#define TK_TODAY 179
#define TK_TIMEZONE 180
#define TK_COUNT 181
#define TK_FIRST 182
#define TK_LAST 183
#define TK_LAST_ROW 184
#define TK_BETWEEN 185
#define TK_IS 186
#define TK_NK_LT 187
#define TK_NK_GT 188
#define TK_NK_LE 189
#define TK_NK_GE 190
#define TK_NK_NE 191
#define TK_MATCH 192
#define TK_NMATCH 193
#define TK_CONTAINS 194
#define TK_JOIN 195
#define TK_INNER 196
#define TK_SELECT 197
#define TK_DISTINCT 198
#define TK_WHERE 199
#define TK_PARTITION 200
#define TK_BY 201
#define TK_SESSION 202
#define TK_STATE_WINDOW 203
#define TK_SLIDING 204
#define TK_FILL 205
#define TK_VALUE 206
#define TK_NONE 207
#define TK_PREV 208
#define TK_LINEAR 209
#define TK_NEXT 210
#define TK_GROUP 211
#define TK_HAVING 212
#define TK_ORDER 213
#define TK_SLIMIT 214
#define TK_SOFFSET 215
#define TK_LIMIT 216
#define TK_OFFSET 217
#define TK_ASC 218
#define TK_NULLS 219
#define TK_STRICT 71
#define TK_NK_COMMA 72
#define TK_NK_COLON 73
#define TK_TABLE 74
#define TK_NK_LP 75
#define TK_NK_RP 76
#define TK_STABLE 77
#define TK_ADD 78
#define TK_COLUMN 79
#define TK_MODIFY 80
#define TK_RENAME 81
#define TK_TAG 82
#define TK_SET 83
#define TK_NK_EQ 84
#define TK_USING 85
#define TK_TAGS 86
#define TK_NK_DOT 87
#define TK_COMMENT 88
#define TK_BOOL 89
#define TK_TINYINT 90
#define TK_SMALLINT 91
#define TK_INT 92
#define TK_INTEGER 93
#define TK_BIGINT 94
#define TK_FLOAT 95
#define TK_DOUBLE 96
#define TK_BINARY 97
#define TK_TIMESTAMP 98
#define TK_NCHAR 99
#define TK_UNSIGNED 100
#define TK_JSON 101
#define TK_VARCHAR 102
#define TK_MEDIUMBLOB 103
#define TK_BLOB 104
#define TK_VARBINARY 105
#define TK_DECIMAL 106
#define TK_SMA 107
#define TK_ROLLUP 108
#define TK_FILE_FACTOR 109
#define TK_NK_FLOAT 110
#define TK_DELAY 111
#define TK_SHOW 112
#define TK_DATABASES 113
#define TK_TABLES 114
#define TK_STABLES 115
#define TK_MNODES 116
#define TK_MODULES 117
#define TK_QNODES 118
#define TK_FUNCTIONS 119
#define TK_INDEXES 120
#define TK_FROM 121
#define TK_ACCOUNTS 122
#define TK_APPS 123
#define TK_CONNECTIONS 124
#define TK_LICENCE 125
#define TK_GRANTS 126
#define TK_QUERIES 127
#define TK_SCORES 128
#define TK_TOPICS 129
#define TK_VARIABLES 130
#define TK_BNODES 131
#define TK_SNODES 132
#define TK_LIKE 133
#define TK_INDEX 134
#define TK_FULLTEXT 135
#define TK_FUNCTION 136
#define TK_INTERVAL 137
#define TK_TOPIC 138
#define TK_AS 139
#define TK_DESC 140
#define TK_DESCRIBE 141
#define TK_RESET 142
#define TK_QUERY 143
#define TK_EXPLAIN 144
#define TK_ANALYZE 145
#define TK_VERBOSE 146
#define TK_NK_BOOL 147
#define TK_RATIO 148
#define TK_COMPACT 149
#define TK_VNODES 150
#define TK_IN 151
#define TK_OUTPUTTYPE 152
#define TK_AGGREGATE 153
#define TK_BUFSIZE 154
#define TK_STREAM 155
#define TK_INTO 156
#define TK_TRIGGER 157
#define TK_AT_ONCE 158
#define TK_WINDOW_CLOSE 159
#define TK_WATERMARK 160
#define TK_KILL 161
#define TK_CONNECTION 162
#define TK_MERGE 163
#define TK_VGROUP 164
#define TK_REDISTRIBUTE 165
#define TK_SPLIT 166
#define TK_SYNCDB 167
#define TK_NULL 168
#define TK_NK_QUESTION 169
#define TK_NK_ARROW 170
#define TK_ROWTS 171
#define TK_TBNAME 172
#define TK_QSTARTTS 173
#define TK_QENDTS 174
#define TK_WSTARTTS 175
#define TK_WENDTS 176
#define TK_WDURATION 177
#define TK_CAST 178
#define TK_NOW 179
#define TK_TODAY 180
#define TK_TIMEZONE 181
#define TK_COUNT 182
#define TK_FIRST 183
#define TK_LAST 184
#define TK_LAST_ROW 185
#define TK_BETWEEN 186
#define TK_IS 187
#define TK_NK_LT 188
#define TK_NK_GT 189
#define TK_NK_LE 190
#define TK_NK_GE 191
#define TK_NK_NE 192
#define TK_MATCH 193
#define TK_NMATCH 194
#define TK_CONTAINS 195
#define TK_JOIN 196
#define TK_INNER 197
#define TK_SELECT 198
#define TK_DISTINCT 199
#define TK_WHERE 200
#define TK_PARTITION 201
#define TK_BY 202
#define TK_SESSION 203
#define TK_STATE_WINDOW 204
#define TK_SLIDING 205
#define TK_FILL 206
#define TK_VALUE 207
#define TK_NONE 208
#define TK_PREV 209
#define TK_LINEAR 210
#define TK_NEXT 211
#define TK_GROUP 212
#define TK_HAVING 213
#define TK_ORDER 214
#define TK_SLIMIT 215
#define TK_SOFFSET 216
#define TK_LIMIT 217
#define TK_OFFSET 218
#define TK_ASC 219
#define TK_NULLS 220
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301
...
...
include/libs/nodes/cmdnodes.h
浏览文件 @
ba06204d
...
...
@@ -47,6 +47,7 @@ typedef struct SDatabaseOptions {
SValueNode
*
pNumOfVgroups
;
SValueNode
*
pSingleStable
;
SValueNode
*
pStreamMode
;
SValueNode
*
pStrict
;
SNodeList
*
pRetentions
;
}
SDatabaseOptions
;
...
...
include/util/tdef.h
浏览文件 @
ba06204d
...
...
@@ -390,10 +390,14 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_DB_SINGLE_STABLE_OPTION 1
#define TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION 0
#define TSDB_
MIN_DB_STREAM_MODE_OPTION
0
#define TSDB_
MAX_DB_STREAM_MODE_OPTION
1
#define TSDB_
DB_STREAM_MODE_OPTION_OFF
0
#define TSDB_
DB_STREAM_MODE_OPTION_ON
1
#define TSDB_DEFAULT_DB_STREAM_MODE_OPTION 0
#define TSDB_DB_STRICT_OPTION_OFF 0
#define TSDB_DB_STRICT_OPTION_ON 1
#define TSDB_DEFAULT_DB_STRICT_OPTION 0
#define TSDB_MAX_JOIN_TABLE_NUM 10
#define TSDB_MAX_UNION_CLAUSE 5
...
...
source/common/src/tmsg.c
浏览文件 @
ba06204d
...
...
@@ -611,6 +611,8 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfTags
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfSmas
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
commentLen
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
ast1Len
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
ast2Len
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfColumns
;
++
i
)
{
SField
*
pField
=
taosArrayGet
(
pReq
->
pColumns
,
i
);
...
...
@@ -636,6 +638,12 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if
(
pReq
->
commentLen
>
0
)
{
if
(
tEncodeBinary
(
&
encoder
,
pReq
->
comment
,
pReq
->
commentLen
)
<
0
)
return
-
1
;
}
if
(
pReq
->
ast1Len
>
0
)
{
if
(
tEncodeBinary
(
&
encoder
,
pReq
->
pAst1
,
pReq
->
ast1Len
)
<
0
)
return
-
1
;
}
if
(
pReq
->
ast2Len
>
0
)
{
if
(
tEncodeBinary
(
&
encoder
,
pReq
->
pAst2
,
pReq
->
ast2Len
)
<
0
)
return
-
1
;
}
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
...
...
@@ -658,6 +666,8 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfTags
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfSmas
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
commentLen
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
ast1Len
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
ast2Len
)
<
0
)
return
-
1
;
pReq
->
pColumns
=
taosArrayInit
(
pReq
->
numOfColumns
,
sizeof
(
SField
));
pReq
->
pTags
=
taosArrayInit
(
pReq
->
numOfTags
,
sizeof
(
SField
));
...
...
@@ -706,6 +716,18 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
comment
)
<
0
)
return
-
1
;
}
if
(
pReq
->
ast1Len
>
0
)
{
pReq
->
pAst1
=
taosMemoryMalloc
(
pReq
->
ast1Len
);
if
(
pReq
->
pAst1
==
NULL
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
pAst1
)
<
0
)
return
-
1
;
}
if
(
pReq
->
ast2Len
>
0
)
{
pReq
->
pAst2
=
taosMemoryMalloc
(
pReq
->
ast2Len
);
if
(
pReq
->
pAst2
==
NULL
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
pAst2
)
<
0
)
return
-
1
;
}
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
...
...
@@ -717,6 +739,8 @@ void tFreeSMCreateStbReq(SMCreateStbReq *pReq) {
taosArrayDestroy
(
pReq
->
pTags
);
taosArrayDestroy
(
pReq
->
pSmas
);
taosMemoryFreeClear
(
pReq
->
comment
);
taosMemoryFreeClear
(
pReq
->
pAst1
);
taosMemoryFreeClear
(
pReq
->
pAst2
);
pReq
->
pColumns
=
NULL
;
pReq
->
pTags
=
NULL
;
pReq
->
pSmas
=
NULL
;
...
...
@@ -2211,6 +2235,14 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
update
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
cacheLastRow
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
streamMode
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
numOfRetensions
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pRsp
->
numOfRetensions
;
++
i
)
{
SRetention
*
pRetension
=
taosArrayGet
(
pRsp
->
pRetensions
,
i
);
if
(
tEncodeI32
(
&
encoder
,
pRetension
->
freq
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRetension
->
keep
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRetension
->
freqUnit
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRetension
->
keepUnit
)
<
0
)
return
-
1
;
}
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
...
...
@@ -2242,7 +2274,24 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
update
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
cacheLastRow
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
streamMode
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
numOfRetensions
)
<
0
)
return
-
1
;
pRsp
->
pRetensions
=
taosArrayInit
(
pRsp
->
numOfRetensions
,
sizeof
(
SRetention
));
if
(
pRsp
->
pRetensions
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
pRsp
->
numOfRetensions
;
++
i
)
{
SRetention
rentension
=
{
0
};
if
(
tDecodeI32
(
&
decoder
,
&
rentension
.
freq
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
rentension
.
keep
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
rentension
.
freqUnit
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
rentension
.
keepUnit
)
<
0
)
return
-
1
;
if
(
taosArrayPush
(
pRsp
->
pRetensions
,
&
rentension
)
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
}
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
ba06204d
...
...
@@ -355,10 +355,14 @@ typedef struct {
int32_t
numOfTags
;
int32_t
numOfSmas
;
int32_t
commentLen
;
int32_t
ast1Len
;
int32_t
ast2Len
;
SSchema
*
pColumns
;
SSchema
*
pTags
;
SSchema
*
pSmas
;
char
*
comment
;
char
*
pAst1
;
char
*
pAst2
;
SRWLatch
lock
;
}
SStbObj
;
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
ba06204d
...
...
@@ -783,6 +783,8 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
cfgRsp
.
cacheLastRow
=
pDb
->
cfg
.
cacheLastRow
;
cfgRsp
.
streamMode
=
pDb
->
cfg
.
streamMode
;
cfgRsp
.
singleSTable
=
pDb
->
cfg
.
singleSTable
;
cfgRsp
.
numOfRetensions
=
pDb
->
cfg
.
numOfRetensions
;
cfgRsp
.
pRetensions
=
pDb
->
cfg
.
pRetensions
;
int32_t
contLen
=
tSerializeSDbCfgRsp
(
NULL
,
0
,
&
cfgRsp
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
...
...
@@ -797,6 +799,8 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
pReq
->
pRsp
=
pRsp
;
pReq
->
rspLen
=
contLen
;
code
=
0
;
GET_DB_CFG_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
ba06204d
...
...
@@ -72,7 +72,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
size
=
sizeof
(
SStbObj
)
+
(
pStb
->
numOfColumns
+
pStb
->
numOfTags
+
pStb
->
numOfSmas
)
*
sizeof
(
SSchema
)
+
TSDB_STB_RESERVE_SIZE
;
+
pStb
->
commentLen
+
pStb
->
ast1Len
+
pStb
->
ast2Len
+
TSDB_STB_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_STB
,
TSDB_STB_VER_NUMBER
,
size
);
if
(
pRaw
==
NULL
)
goto
_OVER
;
...
...
@@ -93,6 +93,8 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
numOfTags
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
numOfSmas
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
commentLen
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
ast1Len
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
ast2Len
,
_OVER
)
for
(
int32_t
i
=
0
;
i
<
pStb
->
numOfColumns
;
++
i
)
{
SSchema
*
pSchema
=
&
pStb
->
pColumns
[
i
];
...
...
@@ -121,6 +123,12 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
if
(
pStb
->
commentLen
>
0
)
{
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pStb
->
comment
,
pStb
->
commentLen
,
_OVER
)
}
if
(
pStb
->
ast1Len
>
0
)
{
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pStb
->
pAst1
,
pStb
->
ast1Len
,
_OVER
)
}
if
(
pStb
->
ast2Len
>
0
)
{
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pStb
->
pAst2
,
pStb
->
ast2Len
,
_OVER
)
}
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_STB_RESERVE_SIZE
,
_OVER
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
_OVER
)
...
...
@@ -173,6 +181,8 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
numOfTags
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
numOfSmas
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
commentLen
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
ast1Len
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
ast2Len
,
_OVER
)
pStb
->
pColumns
=
taosMemoryCalloc
(
pStb
->
numOfColumns
,
sizeof
(
SSchema
));
pStb
->
pTags
=
taosMemoryCalloc
(
pStb
->
numOfTags
,
sizeof
(
SSchema
));
...
...
@@ -210,6 +220,16 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
if
(
pStb
->
comment
==
NULL
)
goto
_OVER
;
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pStb
->
comment
,
pStb
->
commentLen
,
_OVER
)
}
if
(
pStb
->
ast1Len
>
0
)
{
pStb
->
pAst1
=
taosMemoryCalloc
(
pStb
->
ast1Len
,
1
);
if
(
pStb
->
pAst1
==
NULL
)
goto
_OVER
;
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pStb
->
pAst1
,
pStb
->
ast1Len
,
_OVER
)
}
if
(
pStb
->
ast2Len
>
0
)
{
pStb
->
pAst2
=
taosMemoryCalloc
(
pStb
->
ast2Len
,
1
);
if
(
pStb
->
pAst2
==
NULL
)
goto
_OVER
;
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pStb
->
pAst2
,
pStb
->
ast2Len
,
_OVER
)
}
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
TSDB_STB_RESERVE_SIZE
,
_OVER
)
terrno
=
0
;
...
...
@@ -238,6 +258,8 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
taosMemoryFreeClear
(
pStb
->
pColumns
);
taosMemoryFreeClear
(
pStb
->
pTags
);
taosMemoryFreeClear
(
pStb
->
comment
);
taosMemoryFreeClear
(
pStb
->
pAst1
);
taosMemoryFreeClear
(
pStb
->
pAst2
);
return
0
;
}
...
...
@@ -294,6 +316,30 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
}
}
if
(
pOld
->
ast1Len
<
pNew
->
ast1Len
)
{
void
*
pAst1
=
taosMemoryMalloc
(
pNew
->
ast1Len
);
if
(
pAst1
!=
NULL
)
{
taosMemoryFree
(
pOld
->
pAst1
);
pOld
->
pAst1
=
pAst1
;
}
else
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
mTrace
(
"stb:%s, failed to perform update action since %s"
,
pOld
->
name
,
terrstr
());
taosWUnLockLatch
(
&
pOld
->
lock
);
}
}
if
(
pOld
->
ast2Len
<
pNew
->
ast2Len
)
{
void
*
pAst2
=
taosMemoryMalloc
(
pNew
->
ast2Len
);
if
(
pAst2
!=
NULL
)
{
taosMemoryFree
(
pOld
->
pAst2
);
pOld
->
pAst2
=
pAst2
;
}
else
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
mTrace
(
"stb:%s, failed to perform update action since %s"
,
pOld
->
name
,
terrstr
());
taosWUnLockLatch
(
&
pOld
->
lock
);
}
}
pOld
->
updateTime
=
pNew
->
updateTime
;
pOld
->
version
=
pNew
->
version
;
pOld
->
nextColId
=
pNew
->
nextColId
;
...
...
@@ -304,6 +350,12 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
if
(
pNew
->
commentLen
!=
0
)
{
memcpy
(
pOld
->
comment
,
pNew
->
comment
,
TSDB_STB_COMMENT_LEN
);
}
if
(
pNew
->
ast1Len
!=
0
)
{
memcpy
(
pOld
->
pAst1
,
pNew
->
pAst1
,
pNew
->
ast1Len
);
}
if
(
pNew
->
ast2Len
!=
0
)
{
memcpy
(
pOld
->
pAst2
,
pNew
->
pAst2
,
pNew
->
ast2Len
);
}
taosWUnLockLatch
(
&
pOld
->
lock
);
return
0
;
}
...
...
@@ -646,6 +698,26 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
memcpy
(
stbObj
.
comment
,
pCreate
->
comment
,
stbObj
.
commentLen
);
}
stbObj
.
ast1Len
=
pCreate
->
ast1Len
;
if
(
stbObj
.
ast1Len
>
0
)
{
stbObj
.
pAst1
=
taosMemoryCalloc
(
stbObj
.
ast1Len
,
1
);
if
(
stbObj
.
pAst1
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
memcpy
(
stbObj
.
pAst1
,
pCreate
->
pAst1
,
stbObj
.
ast1Len
);
}
stbObj
.
ast2Len
=
pCreate
->
ast2Len
;
if
(
stbObj
.
ast2Len
>
0
)
{
stbObj
.
pAst2
=
taosMemoryCalloc
(
stbObj
.
ast2Len
,
1
);
if
(
stbObj
.
pAst2
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
memcpy
(
stbObj
.
pAst2
,
pCreate
->
pAst2
,
stbObj
.
ast2Len
);
}
stbObj
.
pColumns
=
taosMemoryMalloc
(
stbObj
.
numOfColumns
*
sizeof
(
SSchema
));
stbObj
.
pTags
=
taosMemoryMalloc
(
stbObj
.
numOfTags
*
sizeof
(
SSchema
));
stbObj
.
pSmas
=
taosMemoryMalloc
(
stbObj
.
numOfSmas
*
sizeof
(
SSchema
));
...
...
source/libs/parser/inc/parAst.h
浏览文件 @
ba06204d
...
...
@@ -53,6 +53,7 @@ typedef enum EDatabaseOptionType {
DB_OPTION_VGROUPS
,
DB_OPTION_SINGLE_STABLE
,
DB_OPTION_STREAM_MODE
,
DB_OPTION_STRICT
,
DB_OPTION_RETENTIONS
}
EDatabaseOptionType
;
...
...
source/libs/parser/inc/sql.y
浏览文件 @
ba06204d
...
...
@@ -161,6 +161,7 @@ db_options(A) ::= db_options(B) VGROUPS NK_INTEGER(C).
db_options(A) ::= db_options(B) SINGLE_STABLE NK_INTEGER(C). { ((SDatabaseOptions*)B)->pSingleStable = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &C); A = B; }
db_options(A) ::= db_options(B) STREAM_MODE NK_INTEGER(C). { ((SDatabaseOptions*)B)->pStreamMode = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &C); A = B; }
db_options(A) ::= db_options(B) RETENTIONS retention_list(C). { ((SDatabaseOptions*)B)->pRetentions = C; A = B; }
db_options(A) ::= db_options(B) STRICT NK_INTEGER(C). { ((SDatabaseOptions*)B)->pStrict = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &C); A = B; }
alter_db_options(A) ::= alter_db_option(B). { A = createDatabaseOptions(pCxt); A = setDatabaseAlterOption(pCxt, A, &B); }
alter_db_options(A) ::= alter_db_options(B) alter_db_option(C). { A = setDatabaseAlterOption(pCxt, B, &C); }
...
...
@@ -175,6 +176,7 @@ alter_db_option(A) ::= WAL NK_INTEGER(B).
alter_db_option(A) ::= QUORUM NK_INTEGER(B). { A.type = DB_OPTION_QUORUM; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }
alter_db_option(A) ::= CACHELAST NK_INTEGER(B). { A.type = DB_OPTION_CACHELAST; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }
alter_db_option(A) ::= REPLICA NK_INTEGER(B). { A.type = DB_OPTION_REPLICA; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }
alter_db_option(A) ::= STRICT NK_INTEGER(B). { A.type = DB_OPTION_STRICT; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }
%type integer_list { SNodeList* }
%destructor integer_list { nodesDestroyList($$); }
...
...
@@ -359,7 +361,7 @@ from_db_opt(A) ::= FROM db_name(B).
%type func_name_list { SNodeList* }
%destructor func_name_list { nodesDestroyList($$); }
func_name_list(A) ::= func_name(B). { A = createNodeList(pCxt, B); }
func_name_list(A) ::= func_name_list(B) NK_COMMA
col_name(C).
{ A = addNodeToList(pCxt, B, C); }
func_name_list(A) ::= func_name_list(B) NK_COMMA
func_name(C).
{ A = addNodeToList(pCxt, B, C); }
func_name(A) ::= function_name(B). { A = createFunctionNode(pCxt, &B, NULL); }
...
...
source/libs/parser/src/parTokenizer.c
浏览文件 @
ba06204d
...
...
@@ -168,6 +168,7 @@ static SKeyword keywordTable[] = {
{
"STREAM"
,
TK_STREAM
},
{
"STREAMS"
,
TK_STREAMS
},
{
"STREAM_MODE"
,
TK_STREAM_MODE
},
{
"STRICT"
,
TK_STRICT
},
{
"SYNCDB"
,
TK_SYNCDB
},
{
"TABLE"
,
TK_TABLE
},
{
"TABLES"
,
TK_TABLES
},
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
ba06204d
...
...
@@ -198,6 +198,56 @@ static int32_t getDBVgVersion(STranslateContext* pCxt, const char* pDbFName, int
return
code
;
}
static
int32_t
getDBCfg
(
STranslateContext
*
pCxt
,
const
char
*
pDbName
,
SDbCfgInfo
*
pInfo
)
{
SParseContext
*
pParCxt
=
pCxt
->
pParseCxt
;
SName
name
;
tNameSetDbName
(
&
name
,
pCxt
->
pParseCxt
->
acctId
,
pDbName
,
strlen
(
pDbName
));
char
dbFname
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
tNameGetFullDbName
(
&
name
,
dbFname
);
int32_t
code
=
collectUseDatabaseImpl
(
dbFname
,
pCxt
->
pDbs
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
catalogGetDBCfg
(
pParCxt
->
pCatalog
,
pParCxt
->
pTransporter
,
&
pParCxt
->
mgmtEpSet
,
dbFname
,
pInfo
);
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
parserError
(
"catalogGetDBCfg error, code:%s, dbFName:%s"
,
tstrerror
(
code
),
dbFname
);
}
return
code
;
}
static
int32_t
initTranslateContext
(
SParseContext
*
pParseCxt
,
STranslateContext
*
pCxt
)
{
pCxt
->
pParseCxt
=
pParseCxt
;
pCxt
->
errCode
=
TSDB_CODE_SUCCESS
;
pCxt
->
msgBuf
.
buf
=
pParseCxt
->
pMsg
;
pCxt
->
msgBuf
.
len
=
pParseCxt
->
msgLen
;
pCxt
->
pNsLevel
=
taosArrayInit
(
TARRAY_MIN_SIZE
,
POINTER_BYTES
);
pCxt
->
currLevel
=
0
;
pCxt
->
currClause
=
0
;
pCxt
->
pDbs
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
pCxt
->
pTables
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
if
(
NULL
==
pCxt
->
pNsLevel
||
NULL
==
pCxt
->
pDbs
||
NULL
==
pCxt
->
pTables
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
return
TSDB_CODE_SUCCESS
;
}
static
void
destroyTranslateContext
(
STranslateContext
*
pCxt
)
{
if
(
NULL
!=
pCxt
->
pNsLevel
)
{
size_t
size
=
taosArrayGetSize
(
pCxt
->
pNsLevel
);
for
(
size_t
i
=
0
;
i
<
size
;
++
i
)
{
taosArrayDestroy
(
taosArrayGetP
(
pCxt
->
pNsLevel
,
i
));
}
taosArrayDestroy
(
pCxt
->
pNsLevel
);
}
if
(
NULL
!=
pCxt
->
pCmdMsg
)
{
taosMemoryFreeClear
(
pCxt
->
pCmdMsg
->
pMsg
);
taosMemoryFreeClear
(
pCxt
->
pCmdMsg
);
}
taosHashCleanup
(
pCxt
->
pDbs
);
taosHashCleanup
(
pCxt
->
pTables
);
}
static
bool
belongTable
(
const
char
*
currentDb
,
const
SColumnNode
*
pCol
,
const
STableNode
*
pTable
)
{
int
cmp
=
0
;
if
(
'\0'
!=
pCol
->
dbName
[
0
])
{
...
...
@@ -749,6 +799,8 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
case
QUERY_NODE_REAL_TABLE
:
{
SRealTableNode
*
pRealTable
=
(
SRealTableNode
*
)
pTable
;
pRealTable
->
ratio
=
(
NULL
!=
pCxt
->
pExplainOpt
?
pCxt
->
pExplainOpt
->
ratio
:
1
.
0
);
// The SRealTableNode created through ROLLUP already has STableMeta.
if
(
NULL
==
pRealTable
->
pMeta
)
{
SName
name
;
code
=
getTableMetaImpl
(
pCxt
,
toName
(
pCxt
->
pParseCxt
->
acctId
,
pRealTable
->
table
.
dbName
,
pRealTable
->
table
.
tableName
,
&
name
),
...
...
@@ -756,8 +808,9 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_TABLE_NOT_EXIST
,
pRealTable
->
table
.
tableName
);
}
pRealTable
->
table
.
precision
=
pRealTable
->
pMeta
->
tableInfo
.
precision
;
code
=
setTableVgroupList
(
pCxt
,
&
name
,
pRealTable
);
}
pRealTable
->
table
.
precision
=
pRealTable
->
pMeta
->
tableInfo
.
precision
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
addNamespace
(
pCxt
,
pRealTable
);
}
...
...
@@ -1365,6 +1418,7 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS
pReq
->
streamMode
=
GET_OPTION_VAL
(
pStmt
->
pOptions
->
pStreamMode
,
TSDB_DEFAULT_DB_STREAM_MODE_OPTION
);
pReq
->
ttl
=
GET_OPTION_VAL
(
pStmt
->
pOptions
->
pTtl
,
TSDB_DEFAULT_DB_TTL_OPTION
);
pReq
->
singleSTable
=
GET_OPTION_VAL
(
pStmt
->
pOptions
->
pSingleStable
,
TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION
);
// pReq->strict = GET_OPTION_VAL(pStmt->pOptions->pStrict, TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION);
return
buildCreateDbRetentions
(
pStmt
->
pOptions
->
pRetentions
,
pReq
);
}
...
...
@@ -1573,12 +1627,16 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, SDatabaseOptions* p
TSDB_MAX_DB_SINGLE_STABLE_OPTION
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
checkDbEnumOption
(
pCxt
,
"streamMode"
,
pOptions
->
pStreamMode
,
TSDB_
MIN_DB_STREAM_MODE_OPTION
,
TSDB_
MAX_DB_STREAM_MODE_OPTI
ON
);
code
=
checkDbEnumOption
(
pCxt
,
"streamMode"
,
pOptions
->
pStreamMode
,
TSDB_
DB_STREAM_MODE_OPTION_OFF
,
TSDB_
DB_STREAM_MODE_OPTION_
ON
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
checkDbRetentionsOption
(
pCxt
,
pOptions
->
pRetentions
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
checkDbEnumOption
(
pCxt
,
"strict"
,
pOptions
->
pStrict
,
TSDB_DB_STRICT_OPTION_OFF
,
TSDB_DB_STRICT_OPTION_ON
);
}
return
code
;
}
...
...
@@ -1789,35 +1847,259 @@ static int32_t getAggregationMethod(SNodeList* pFuncs) {
return
((
SFunctionNode
*
)
nodesListGetNode
(
pFuncs
,
0
))
->
funcId
;
}
static
int32_t
translateCreateSuperTable
(
STranslateContext
*
pCxt
,
SCreateTableStmt
*
pStmt
)
{
int32_t
code
=
checkCreateTable
(
pCxt
,
pStmt
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
static
void
toSchema
(
const
SColumnDefNode
*
pCol
,
col_id_t
colId
,
SSchema
*
pSchema
)
{
int8_t
flags
=
0
;
if
(
pCol
->
sma
)
{
flags
|=
SCHEMA_SMA_ON
;
}
pSchema
->
colId
=
colId
;
pSchema
->
type
=
pCol
->
dataType
.
type
;
pSchema
->
bytes
=
calcTypeBytes
(
pCol
->
dataType
);
pSchema
->
flags
=
flags
;
strcpy
(
pSchema
->
name
,
pCol
->
colName
);
}
typedef
struct
SSampleAstInfo
{
const
char
*
pDbName
;
const
char
*
pTableName
;
SNodeList
*
pFuncs
;
SNode
*
pInterval
;
SNode
*
pOffset
;
SNode
*
pSliding
;
STableMeta
*
pRollupTableMeta
;
}
SSampleAstInfo
;
static
int32_t
buildSampleAst
(
STranslateContext
*
pCxt
,
SSampleAstInfo
*
pInfo
,
char
**
pAst
,
int32_t
*
pLen
)
{
SSelectStmt
*
pSelect
=
nodesMakeNode
(
QUERY_NODE_SELECT_STMT
);
if
(
NULL
==
pSelect
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
sprintf
(
pSelect
->
stmtName
,
"%p"
,
pSelect
);
SRealTableNode
*
pTable
=
nodesMakeNode
(
QUERY_NODE_REAL_TABLE
);
if
(
NULL
==
pTable
)
{
nodesDestroyNode
(
pSelect
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
strcpy
(
pTable
->
table
.
dbName
,
pInfo
->
pDbName
);
strcpy
(
pTable
->
table
.
tableName
,
pInfo
->
pTableName
);
TSWAP
(
pTable
->
pMeta
,
pInfo
->
pRollupTableMeta
,
STableMeta
*
);
pSelect
->
pFromTable
=
(
SNode
*
)
pTable
;
TSWAP
(
pSelect
->
pProjectionList
,
pInfo
->
pFuncs
,
SNodeList
*
);
SFunctionNode
*
pFunc
=
nodesMakeNode
(
QUERY_NODE_FUNCTION
);
if
(
NULL
==
pSelect
->
pProjectionList
||
NULL
==
pFunc
)
{
nodesDestroyNode
(
pSelect
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
strcpy
(
pFunc
->
functionName
,
"_wstartts"
);
nodesListPushFront
(
pSelect
->
pProjectionList
,
pFunc
);
SNode
*
pProject
=
NULL
;
FOREACH
(
pProject
,
pSelect
->
pProjectionList
)
{
sprintf
(((
SExprNode
*
)
pProject
)
->
aliasName
,
"#%p"
,
pProject
);
}
SIntervalWindowNode
*
pInterval
=
nodesMakeNode
(
QUERY_NODE_INTERVAL_WINDOW
);
if
(
NULL
==
pInterval
)
{
nodesDestroyNode
(
pSelect
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pSelect
->
pWindow
=
(
SNode
*
)
pInterval
;
TSWAP
(
pInterval
->
pInterval
,
pInfo
->
pInterval
,
SNode
*
);
TSWAP
(
pInterval
->
pOffset
,
pInfo
->
pOffset
,
SNode
*
);
TSWAP
(
pInterval
->
pSliding
,
pInfo
->
pSliding
,
SNode
*
);
pInterval
->
pCol
=
nodesMakeNode
(
QUERY_NODE_COLUMN
);
if
(
NULL
==
pInterval
->
pCol
)
{
nodesDestroyNode
(
pSelect
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
((
SColumnNode
*
)
pInterval
->
pCol
)
->
colId
=
PRIMARYKEY_TIMESTAMP_COL_ID
;
strcpy
(((
SColumnNode
*
)
pInterval
->
pCol
)
->
colName
,
PK_TS_COL_INTERNAL_NAME
);
int32_t
code
=
translateQuery
(
pCxt
,
(
SNode
*
)
pSelect
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesNodeToString
(
pSelect
,
false
,
pAst
,
pLen
);
}
nodesDestroyNode
(
pSelect
);
return
code
;
}
static
void
clearSampleAstInfo
(
SSampleAstInfo
*
pInfo
)
{
nodesDestroyList
(
pInfo
->
pFuncs
);
nodesDestroyNode
(
pInfo
->
pInterval
);
nodesDestroyNode
(
pInfo
->
pOffset
);
nodesDestroyNode
(
pInfo
->
pSliding
);
}
static
SNode
*
makeIntervalVal
(
SRetention
*
pRetension
,
int8_t
precision
)
{
SValueNode
*
pVal
=
nodesMakeNode
(
QUERY_NODE_VALUE
);
if
(
NULL
==
pVal
)
{
return
NULL
;
}
int64_t
timeVal
=
convertTimeFromPrecisionToUnit
(
pRetension
->
freq
,
precision
,
pRetension
->
freqUnit
);
char
buf
[
20
]
=
{
0
};
int32_t
len
=
snprintf
(
buf
,
sizeof
(
buf
),
"%"
PRId64
"%c"
,
timeVal
,
pRetension
->
freqUnit
);
pVal
->
literal
=
strndup
(
buf
,
len
);
if
(
NULL
==
pVal
->
literal
)
{
nodesDestroyNode
(
pVal
);
return
NULL
;
}
pVal
->
isDuration
=
true
;
pVal
->
node
.
resType
.
type
=
TSDB_DATA_TYPE_BIGINT
;
pVal
->
node
.
resType
.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_BIGINT
].
bytes
;
pVal
->
node
.
resType
.
precision
=
precision
;
return
(
SNode
*
)
pVal
;
}
static
SNode
*
createColumnFromDef
(
SColumnDefNode
*
pDef
)
{
SColumnNode
*
pCol
=
nodesMakeNode
(
QUERY_NODE_COLUMN
);
if
(
NULL
==
pCol
)
{
return
NULL
;
}
strcpy
(
pCol
->
colName
,
pDef
->
colName
);
return
(
SNode
*
)
pCol
;
}
SMCreateStbReq
createReq
=
{
0
};
createReq
.
igExists
=
pStmt
->
ignoreExists
;
createReq
.
aggregationMethod
=
getAggregationMethod
(
pStmt
->
pOptions
->
pFuncs
);
createReq
.
xFilesFactor
=
GET_OPTION_VAL
(
pStmt
->
pOptions
->
pFilesFactor
,
TSDB_DEFAULT_DB_FILE_FACTOR
);
createReq
.
delay
=
GET_OPTION_VAL
(
pStmt
->
pOptions
->
pDelay
,
TSDB_DEFAULT_DB_DELAY
);
columnDefNodeToField
(
pStmt
->
pCols
,
&
createReq
.
pColumns
);
columnDefNodeToField
(
pStmt
->
pTags
,
&
createReq
.
pTags
);
createReq
.
numOfColumns
=
LIST_LENGTH
(
pStmt
->
pCols
);
createReq
.
numOfTags
=
LIST_LENGTH
(
pStmt
->
pTags
);
static
SNode
*
createRollupFunc
(
SNode
*
pSrcFunc
,
SColumnDefNode
*
pColDef
)
{
SFunctionNode
*
pFunc
=
nodesCloneNode
(
pSrcFunc
);
if
(
NULL
==
pFunc
)
{
return
NULL
;
}
if
(
TSDB_CODE_SUCCESS
!=
nodesListMakeStrictAppend
(
&
pFunc
->
pParameterList
,
createColumnFromDef
(
pColDef
)))
{
nodesDestroyNode
(
pFunc
);
return
NULL
;
}
return
(
SNode
*
)
pFunc
;
}
static
SNodeList
*
createRollupFuncs
(
SCreateTableStmt
*
pStmt
)
{
SNodeList
*
pFuncs
=
nodesMakeList
();
if
(
NULL
==
pFuncs
)
{
return
NULL
;
}
SNode
*
pFunc
=
NULL
;
FOREACH
(
pFunc
,
pStmt
->
pOptions
->
pFuncs
)
{
SNode
*
pCol
=
NULL
;
bool
primaryKey
=
true
;
FOREACH
(
pCol
,
pStmt
->
pCols
)
{
if
(
primaryKey
)
{
primaryKey
=
false
;
continue
;
}
if
(
TSDB_CODE_SUCCESS
!=
nodesListStrictAppend
(
pFuncs
,
createRollupFunc
(
pFunc
,
(
SColumnDefNode
*
)
pCol
)))
{
nodesDestroyList
(
pFuncs
);
return
NULL
;
}
}
}
return
pFuncs
;
}
static
STableMeta
*
createRollupTableMeta
(
SCreateTableStmt
*
pStmt
,
int8_t
precision
)
{
int32_t
numOfField
=
LIST_LENGTH
(
pStmt
->
pCols
)
+
LIST_LENGTH
(
pStmt
->
pTags
);
STableMeta
*
pMeta
=
taosMemoryCalloc
(
1
,
sizeof
(
STableMeta
)
+
numOfField
*
sizeof
(
SSchema
));
if
(
NULL
==
pMeta
)
{
return
NULL
;
}
pMeta
->
tableType
=
TSDB_SUPER_TABLE
;
pMeta
->
tableInfo
.
numOfTags
=
LIST_LENGTH
(
pStmt
->
pTags
);
pMeta
->
tableInfo
.
precision
=
precision
;
pMeta
->
tableInfo
.
numOfColumns
=
LIST_LENGTH
(
pStmt
->
pCols
);
int32_t
index
=
0
;
SNode
*
pCol
=
NULL
;
FOREACH
(
pCol
,
pStmt
->
pCols
)
{
toSchema
((
SColumnDefNode
*
)
pCol
,
index
+
1
,
pMeta
->
schema
+
index
);
++
index
;
}
SNode
*
pTag
=
NULL
;
FOREACH
(
pTag
,
pStmt
->
pTags
)
{
toSchema
((
SColumnDefNode
*
)
pTag
,
index
+
1
,
pMeta
->
schema
+
index
);
++
index
;
}
return
pMeta
;
}
static
int32_t
buildSampleAstInfoByTable
(
STranslateContext
*
pCxt
,
SCreateTableStmt
*
pStmt
,
SRetention
*
pRetension
,
int8_t
precision
,
SSampleAstInfo
*
pInfo
)
{
pInfo
->
pDbName
=
pStmt
->
dbName
;
pInfo
->
pTableName
=
pStmt
->
tableName
;
pInfo
->
pFuncs
=
createRollupFuncs
(
pStmt
);
pInfo
->
pInterval
=
makeIntervalVal
(
pRetension
,
precision
);
pInfo
->
pRollupTableMeta
=
createRollupTableMeta
(
pStmt
,
precision
);
if
(
NULL
==
pInfo
->
pFuncs
||
NULL
==
pInfo
->
pInterval
||
NULL
==
pInfo
->
pRollupTableMeta
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
getRollupAst
(
STranslateContext
*
pCxt
,
SCreateTableStmt
*
pStmt
,
SRetention
*
pRetension
,
int8_t
precision
,
char
**
pAst
,
int32_t
*
pLen
)
{
SSampleAstInfo
info
=
{
0
};
int32_t
code
=
buildSampleAstInfoByTable
(
pCxt
,
pStmt
,
pRetension
,
precision
,
&
info
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
buildSampleAst
(
pCxt
,
&
info
,
pAst
,
pLen
);
}
clearSampleAstInfo
(
&
info
);
return
code
;
}
static
int32_t
buildRollupAst
(
STranslateContext
*
pCxt
,
SCreateTableStmt
*
pStmt
,
SMCreateStbReq
*
pReq
)
{
SDbCfgInfo
dbCfg
=
{
0
};
int32_t
code
=
getDBCfg
(
pCxt
,
pStmt
->
dbName
,
&
dbCfg
);
int32_t
num
=
taosArrayGetSize
(
dbCfg
.
pRetensions
);
if
(
TSDB_CODE_SUCCESS
!=
code
||
num
<
2
)
{
return
code
;
}
for
(
int32_t
i
=
1
;
i
<
num
;
++
i
)
{
SRetention
*
pRetension
=
taosArrayGet
(
dbCfg
.
pRetensions
,
i
);
STranslateContext
cxt
=
{
0
};
initTranslateContext
(
pCxt
->
pParseCxt
,
&
cxt
);
code
=
getRollupAst
(
&
cxt
,
pStmt
,
pRetension
,
dbCfg
.
precision
,
1
==
i
?
&
pReq
->
pAst1
:
&
pReq
->
pAst2
,
1
==
i
?
&
pReq
->
ast1Len
:
&
pReq
->
ast2Len
);
destroyTranslateContext
(
&
cxt
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
break
;
}
}
return
code
;
}
static
int32_t
buildCreateStbReq
(
STranslateContext
*
pCxt
,
SCreateTableStmt
*
pStmt
,
SMCreateStbReq
*
pReq
)
{
pReq
->
igExists
=
pStmt
->
ignoreExists
;
pReq
->
aggregationMethod
=
getAggregationMethod
(
pStmt
->
pOptions
->
pFuncs
);
pReq
->
xFilesFactor
=
GET_OPTION_VAL
(
pStmt
->
pOptions
->
pFilesFactor
,
TSDB_DEFAULT_DB_FILE_FACTOR
);
pReq
->
delay
=
GET_OPTION_VAL
(
pStmt
->
pOptions
->
pDelay
,
TSDB_DEFAULT_DB_DELAY
);
columnDefNodeToField
(
pStmt
->
pCols
,
&
pReq
->
pColumns
);
columnDefNodeToField
(
pStmt
->
pTags
,
&
pReq
->
pTags
);
pReq
->
numOfColumns
=
LIST_LENGTH
(
pStmt
->
pCols
);
pReq
->
numOfTags
=
LIST_LENGTH
(
pStmt
->
pTags
);
if
(
NULL
==
pStmt
->
pOptions
->
pSma
)
{
columnDefNodeToField
(
pStmt
->
pCols
,
&
createReq
.
pSmas
);
createReq
.
numOfSmas
=
createReq
.
numOfColumns
;
columnDefNodeToField
(
pStmt
->
pCols
,
&
pReq
->
pSmas
);
pReq
->
numOfSmas
=
pReq
->
numOfColumns
;
}
else
{
columnNodeToField
(
pStmt
->
pOptions
->
pSma
,
&
createReq
.
pSmas
);
createReq
.
numOfSmas
=
LIST_LENGTH
(
pStmt
->
pOptions
->
pSma
);
columnNodeToField
(
pStmt
->
pOptions
->
pSma
,
&
pReq
->
pSmas
);
pReq
->
numOfSmas
=
LIST_LENGTH
(
pStmt
->
pOptions
->
pSma
);
}
SName
tableName
=
{.
type
=
TSDB_TABLE_NAME_T
,
.
acctId
=
pCxt
->
pParseCxt
->
acctId
};
strcpy
(
tableName
.
dbname
,
pStmt
->
dbName
);
strcpy
(
tableName
.
tname
,
pStmt
->
tableName
);
tNameExtractFullName
(
&
tableName
,
createReq
.
name
);
tNameExtractFullName
(
&
tableName
,
pReq
->
name
);
return
buildRollupAst
(
pCxt
,
pStmt
,
pReq
);
}
static
int32_t
translateCreateSuperTable
(
STranslateContext
*
pCxt
,
SCreateTableStmt
*
pStmt
)
{
SMCreateStbReq
createReq
=
{
0
};
int32_t
code
=
checkCreateTable
(
pCxt
,
pStmt
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
buildCreateStbReq
(
pCxt
,
pStmt
,
&
createReq
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
buildCmdMsg
(
pCxt
,
TDMT_MND_CREATE_STB
,
(
FSerializeFunc
)
tSerializeSMCreateStbReq
,
&
createReq
);
}
tFreeSMCreateStbReq
(
&
createReq
);
return
code
;
}
...
...
@@ -1983,20 +2265,20 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt
}
static
int32_t
nodeTypeToShowType
(
ENodeType
nt
)
{
//
switch (nt) {
//
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
//
return TSDB_MGMT_TABLE_CONNS;
//
case QUERY_NODE_SHOW_LICENCE_STMT:
//
return TSDB_MGMT_TABLE_GRANTS;
//
case QUERY_NODE_SHOW_QUERIES_STMT:
//
return TSDB_MGMT_TABLE_QUERIES;
//
case QUERY_NODE_SHOW_TOPICS_STMT:
//
return 0; // todo
//
case QUERY_NODE_SHOW_VARIABLE_STMT:
// return TSDB_MGMT_TABLE_VARIABLES;
//
default:
//
break;
//
}
switch
(
nt
)
{
case
QUERY_NODE_SHOW_CONNECTIONS_STMT
:
return
TSDB_MGMT_TABLE_CONNS
;
case
QUERY_NODE_SHOW_LICENCE_STMT
:
return
TSDB_MGMT_TABLE_GRANTS
;
case
QUERY_NODE_SHOW_QUERIES_STMT
:
return
TSDB_MGMT_TABLE_QUERIES
;
case
QUERY_NODE_SHOW_TOPICS_STMT
:
return
0
;
// todo
case
QUERY_NODE_SHOW_VARIABLE_STMT
:
return
0
;
// todo
default:
break
;
}
return
0
;
}
...
...
@@ -2027,57 +2309,28 @@ static int32_t getSmaIndexExpr(STranslateContext* pCxt, SCreateIndexStmt* pStmt,
return
nodesListToString
(
pStmt
->
pOptions
->
pFuncs
,
false
,
pExpr
,
pLen
);
}
static
int32_t
getSmaIndexBuildAst
(
STranslateContext
*
pCxt
,
SCreateIndexStmt
*
pStmt
,
char
**
pAst
,
int32_t
*
pLen
)
{
SSelectStmt
*
pSelect
=
nodesMakeNode
(
QUERY_NODE_SELECT_STMT
);
if
(
NULL
==
pSelect
)
{
static
int32_t
buildSampleAstInfoByIndex
(
STranslateContext
*
pCxt
,
SCreateIndexStmt
*
pStmt
,
SSampleAstInfo
*
pInfo
)
{
pInfo
->
pDbName
=
pCxt
->
pParseCxt
->
db
;
pInfo
->
pTableName
=
pStmt
->
tableName
;
pInfo
->
pFuncs
=
nodesCloneList
(
pStmt
->
pOptions
->
pFuncs
);
pInfo
->
pInterval
=
nodesCloneNode
(
pStmt
->
pOptions
->
pInterval
);
pInfo
->
pOffset
=
nodesCloneNode
(
pStmt
->
pOptions
->
pOffset
);
pInfo
->
pSliding
=
nodesCloneNode
(
pStmt
->
pOptions
->
pSliding
);
if
(
NULL
==
pInfo
->
pFuncs
||
NULL
==
pInfo
->
pInterval
||
(
NULL
!=
pStmt
->
pOptions
->
pOffset
&&
NULL
==
pInfo
->
pOffset
)
||
(
NULL
!=
pStmt
->
pOptions
->
pSliding
&&
NULL
==
pInfo
->
pSliding
))
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
sprintf
(
pSelect
->
stmtName
,
"%p"
,
pSelect
);
SRealTableNode
*
pTable
=
nodesMakeNode
(
QUERY_NODE_REAL_TABLE
);
if
(
NULL
==
pTable
)
{
nodesDestroyNode
(
pSelect
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
strcpy
(
pTable
->
table
.
dbName
,
pCxt
->
pParseCxt
->
db
);
strcpy
(
pTable
->
table
.
tableName
,
pStmt
->
tableName
);
pSelect
->
pFromTable
=
(
SNode
*
)
pTable
;
pSelect
->
pProjectionList
=
nodesCloneList
(
pStmt
->
pOptions
->
pFuncs
);
SFunctionNode
*
pFunc
=
nodesMakeNode
(
QUERY_NODE_FUNCTION
);
if
(
NULL
==
pSelect
->
pProjectionList
||
NULL
==
pFunc
)
{
nodesDestroyNode
(
pSelect
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
strcpy
(
pFunc
->
functionName
,
"_wstartts"
);
nodesListPushFront
(
pSelect
->
pProjectionList
,
pFunc
);
SNode
*
pProject
=
NULL
;
FOREACH
(
pProject
,
pSelect
->
pProjectionList
)
{
sprintf
(((
SExprNode
*
)
pProject
)
->
aliasName
,
"#sma_%p"
,
pProject
);
}
SIntervalWindowNode
*
pInterval
=
nodesMakeNode
(
QUERY_NODE_INTERVAL_WINDOW
);
if
(
NULL
==
pInterval
)
{
nodesDestroyNode
(
pSelect
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pSelect
->
pWindow
=
(
SNode
*
)
pInterval
;
pInterval
->
pCol
=
nodesMakeNode
(
QUERY_NODE_COLUMN
);
pInterval
->
pInterval
=
nodesCloneNode
(
pStmt
->
pOptions
->
pInterval
);
pInterval
->
pOffset
=
nodesCloneNode
(
pStmt
->
pOptions
->
pOffset
);
pInterval
->
pSliding
=
nodesCloneNode
(
pStmt
->
pOptions
->
pSliding
);
if
(
NULL
==
pInterval
->
pCol
||
NULL
==
pInterval
->
pInterval
||
(
NULL
!=
pStmt
->
pOptions
->
pOffset
&&
NULL
==
pInterval
->
pOffset
)
||
(
NULL
!=
pStmt
->
pOptions
->
pSliding
&&
NULL
==
pInterval
->
pSliding
))
{
nodesDestroyNode
(
pSelect
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
((
SColumnNode
*
)
pInterval
->
pCol
)
->
colId
=
PRIMARYKEY_TIMESTAMP_COL_ID
;
strcpy
(((
SColumnNode
*
)
pInterval
->
pCol
)
->
colName
,
PK_TS_COL_INTERNAL_NAME
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
translateQuery
(
pCxt
,
(
SNode
*
)
pSelect
);
static
int32_t
getSmaIndexAst
(
STranslateContext
*
pCxt
,
SCreateIndexStmt
*
pStmt
,
char
**
pAst
,
int32_t
*
pLen
)
{
SSampleAstInfo
info
=
{
0
};
int32_t
code
=
buildSampleAstInfoByIndex
(
pCxt
,
pStmt
,
&
info
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesNodeToString
(
pSelect
,
false
,
pAst
,
pLen
);
code
=
buildSampleAst
(
pCxt
,
&
info
,
pAst
,
pLen
);
}
nodesDestroyNode
(
pSelect
);
clearSampleAstInfo
(
&
info
);
return
code
;
}
...
...
@@ -2106,7 +2359,7 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm
code
=
getSmaIndexExpr
(
pCxt
,
pStmt
,
&
pReq
->
expr
,
&
pReq
->
exprLen
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
getSmaIndex
Build
Ast
(
pCxt
,
pStmt
,
&
pReq
->
ast
,
&
pReq
->
astLen
);
code
=
getSmaIndexAst
(
pCxt
,
pStmt
,
&
pReq
->
ast
,
&
pReq
->
astLen
);
}
return
code
;
...
...
@@ -2129,10 +2382,12 @@ static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt
tFreeSMCreateSmaReq
(
&
createSmaReq
);
return
code
;
}
static
int32_t
buildCreateFullTextReq
(
STranslateContext
*
pCxt
,
SCreateIndexStmt
*
pStmt
,
SMCreateFullTextReq
*
pReq
)
{
// impl later
return
0
;
}
static
int32_t
translateCreateFullTextIndex
(
STranslateContext
*
pCxt
,
SCreateIndexStmt
*
pStmt
)
{
SMCreateFullTextReq
createFTReq
=
{
0
};
int32_t
code
=
buildCreateFullTextReq
(
pCxt
,
pStmt
,
&
createFTReq
);
...
...
@@ -2535,24 +2790,6 @@ int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pS
return
TSDB_CODE_FAILED
;
}
static
void
destroyTranslateContext
(
STranslateContext
*
pCxt
)
{
if
(
NULL
!=
pCxt
->
pNsLevel
)
{
size_t
size
=
taosArrayGetSize
(
pCxt
->
pNsLevel
);
for
(
size_t
i
=
0
;
i
<
size
;
++
i
)
{
taosArrayDestroy
(
taosArrayGetP
(
pCxt
->
pNsLevel
,
i
));
}
taosArrayDestroy
(
pCxt
->
pNsLevel
);
}
if
(
NULL
!=
pCxt
->
pCmdMsg
)
{
taosMemoryFreeClear
(
pCxt
->
pCmdMsg
->
pMsg
);
taosMemoryFreeClear
(
pCxt
->
pCmdMsg
);
}
taosHashCleanup
(
pCxt
->
pDbs
);
taosHashCleanup
(
pCxt
->
pTables
);
}
static
const
char
*
getSysDbName
(
ENodeType
type
)
{
switch
(
type
)
{
case
QUERY_NODE_SHOW_DATABASES_STMT
:
...
...
@@ -2613,8 +2850,9 @@ static const char* getSysTableName(ENodeType type) {
case
QUERY_NODE_SHOW_LICENCE_STMT
:
return
TSDB_INS_TABLE_LICENCES
;
case
QUERY_NODE_SHOW_CONNECTIONS_STMT
:
return
TSDB_PERFS_TABLE_CONNECTIONS
;
case
QUERY_NODE_SHOW_QUERIES_STMT
:
// todo
return
TSDB_PERFS_TABLE_QUERIES
;
default:
break
;
}
...
...
@@ -2739,18 +2977,6 @@ typedef struct SVgroupTablesBatch {
char
dbName
[
TSDB_DB_NAME_LEN
];
}
SVgroupTablesBatch
;
static
void
toSchemaEx
(
const
SColumnDefNode
*
pCol
,
col_id_t
colId
,
SSchema
*
pSchema
)
{
int8_t
flags
=
0
;
if
(
pCol
->
sma
)
{
flags
|=
SCHEMA_SMA_ON
;
}
pSchema
->
colId
=
colId
;
pSchema
->
type
=
pCol
->
dataType
.
type
;
pSchema
->
bytes
=
calcTypeBytes
(
pCol
->
dataType
);
pSchema
->
flags
=
flags
;
strcpy
(
pSchema
->
name
,
pCol
->
colName
);
}
static
void
destroyCreateTbReq
(
SVCreateTbReq
*
pReq
)
{
taosMemoryFreeClear
(
pReq
->
name
);
taosMemoryFreeClear
(
pReq
->
ntbCfg
.
pSchema
);
...
...
@@ -2798,7 +3024,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
SNode
*
pCol
;
col_id_t
index
=
0
;
FOREACH
(
pCol
,
pStmt
->
pCols
)
{
toSchema
Ex
((
SColumnDefNode
*
)
pCol
,
index
+
1
,
req
.
ntbCfg
.
pSchema
+
index
);
toSchema
((
SColumnDefNode
*
)
pCol
,
index
+
1
,
req
.
ntbCfg
.
pSchema
+
index
);
++
index
;
}
if
(
TSDB_CODE_SUCCESS
!=
buildSmaParam
(
pStmt
->
pOptions
,
&
req
))
{
...
...
@@ -3224,19 +3450,11 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
}
int32_t
translate
(
SParseContext
*
pParseCxt
,
SQuery
*
pQuery
)
{
STranslateContext
cxt
=
{
.
pParseCxt
=
pParseCxt
,
.
errCode
=
TSDB_CODE_SUCCESS
,
.
msgBuf
=
{.
buf
=
pParseCxt
->
pMsg
,
.
len
=
pParseCxt
->
msgLen
},
.
pNsLevel
=
taosArrayInit
(
TARRAY_MIN_SIZE
,
POINTER_BYTES
),
.
currLevel
=
0
,
.
currClause
=
0
,
.
pDbs
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
),
.
pTables
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
)};
if
(
NULL
==
cxt
.
pNsLevel
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
STranslateContext
cxt
=
{
0
};
int32_t
code
=
initTranslateContext
(
pParseCxt
,
&
cxt
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
fmFuncMgtInit
();
}
int32_t
code
=
fmFuncMgtInit
();
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
rewriteQuery
(
&
cxt
,
pQuery
);
}
...
...
source/libs/parser/src/sql.c
浏览文件 @
ba06204d
因为 它太大了无法显示 source diff 。你可以改为
查看blob
。
source/libs/scalar/src/sclfunc.c
浏览文件 @
ba06204d
...
...
@@ -304,7 +304,7 @@ static int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
continue
;
}
char
*
in
=
pInputData
->
pData
+
pInputData
->
varmeta
.
offset
[
i
]
;
char
*
in
=
colDataGetData
(
pInputData
,
i
)
;
out
[
i
]
=
lenFn
(
in
,
type
);
}
...
...
@@ -395,11 +395,8 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int16_t
dataLen
=
0
;
for
(
int32_t
i
=
0
;
i
<
inputNum
;
++
i
)
{
if
(
pInput
[
i
].
numOfRows
==
1
)
{
input
[
i
]
=
pInputData
[
i
]
->
pData
+
pInputData
[
i
]
->
varmeta
.
offset
[
0
];
}
else
{
input
[
i
]
=
pInputData
[
i
]
->
pData
+
pInputData
[
i
]
->
varmeta
.
offset
[
k
];
}
int32_t
rowIdx
=
(
pInput
[
i
].
numOfRows
==
1
)
?
0
:
k
;
input
[
i
]
=
colDataGetData
(
pInputData
[
i
],
rowIdx
);
ret
=
concatCopyHelper
(
input
[
i
],
output
,
hasNchar
,
GET_PARAM_TYPE
(
&
pInput
[
i
]),
&
dataLen
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -473,11 +470,8 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
break
;
}
if
(
pInput
[
i
].
numOfRows
==
1
)
{
input
[
i
]
=
pInputData
[
i
]
->
pData
+
pInputData
[
i
]
->
varmeta
.
offset
[
0
];
}
else
{
input
[
i
]
=
pInputData
[
i
]
->
pData
+
pInputData
[
i
]
->
varmeta
.
offset
[
k
];
}
int32_t
rowIdx
=
(
pInput
[
i
].
numOfRows
==
1
)
?
0
:
k
;
input
[
i
]
=
colDataGetData
(
pInputData
[
i
],
rowIdx
);
ret
=
concatCopyHelper
(
input
[
i
],
output
,
hasNchar
,
GET_PARAM_TYPE
(
&
pInput
[
i
]),
&
dataLen
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -534,7 +528,7 @@ static int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScala
continue
;
}
char
*
input
=
pInputData
->
pData
+
pInputData
->
varmeta
.
offset
[
i
]
;
char
*
input
=
colDataGetData
(
pInput
[
0
].
columnData
,
i
)
;
int32_t
len
=
varDataLen
(
input
);
if
(
type
==
TSDB_DATA_TYPE_VARCHAR
)
{
for
(
int32_t
j
=
0
;
j
<
len
;
++
j
)
{
...
...
@@ -575,8 +569,8 @@ static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarPar
colDataAppendNULL
(
pOutputData
,
i
);
continue
;
}
char
*
input
=
pInputData
->
pData
+
pInputData
->
varmeta
.
offset
[
i
];
char
*
input
=
colDataGetData
(
pInput
[
0
].
columnData
,
i
);
int32_t
len
=
varDataLen
(
input
);
int32_t
charLen
=
(
type
==
TSDB_DATA_TYPE_VARCHAR
)
?
len
:
len
/
TSDB_NCHAR_SIZE
;
trimFn
(
input
,
output
,
type
,
charLen
);
...
...
@@ -606,7 +600,7 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int32_t
subLen
=
INT16_MAX
;
if
(
inputNum
==
3
)
{
GET_TYPED_DATA
(
subLen
,
int32_t
,
GET_PARAM_TYPE
(
&
pInput
[
2
]),
pInput
[
2
].
columnData
->
pData
);
if
(
subLen
<
0
)
{
//subLen cannot be negative
if
(
subLen
<
0
||
subLen
>
INT16_MAX
)
{
//subLen cannot be negative
return
TSDB_CODE_FAILED
;
}
subLen
=
(
GET_PARAM_TYPE
(
pInput
)
==
TSDB_DATA_TYPE_VARCHAR
)
?
subLen
:
subLen
*
TSDB_NCHAR_SIZE
;
...
...
@@ -615,19 +609,16 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
SColumnInfoData
*
pInputData
=
pInput
->
columnData
;
SColumnInfoData
*
pOutputData
=
pOutput
->
columnData
;
char
*
input
=
pInputData
->
pData
+
pInputData
->
varmeta
.
offset
[
0
];
char
*
output
=
NULL
;
int32_t
outputLen
=
pInputData
->
varmeta
.
length
*
pInput
->
numOfRows
;
char
*
outputBuf
=
taosMemoryCalloc
(
outputLen
,
1
);
output
=
outputBuf
;
char
*
output
=
outputBuf
;
for
(
int32_t
i
=
0
;
i
<
pInput
->
numOfRows
;
++
i
)
{
if
(
colDataIsNull_s
(
pInputData
,
i
))
{
colDataAppendNULL
(
pOutputData
,
i
);
continue
;
}
char
*
input
=
colDataGetData
(
pInput
[
0
].
columnData
,
i
);
int32_t
len
=
varDataLen
(
input
);
int32_t
startPosBytes
;
...
...
@@ -646,7 +637,6 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
varDataSetLen
(
output
,
resLen
);
colDataAppend
(
pOutputData
,
i
,
output
,
false
);
input
+=
varDataTLen
(
input
);
output
+=
varDataTLen
(
output
);
}
...
...
@@ -799,13 +789,13 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
int32_t
toISO8601Function
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
)
{
int32_t
type
=
GET_PARAM_TYPE
(
pInput
);
char
*
input
=
pInput
[
0
].
columnData
->
pData
;
for
(
int32_t
i
=
0
;
i
<
pInput
[
0
].
numOfRows
;
++
i
)
{
if
(
colDataIsNull_s
(
pInput
[
0
].
columnData
,
i
))
{
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
char
*
input
=
colDataGetData
(
pInput
[
0
].
columnData
,
i
);
char
fraction
[
20
]
=
{
0
};
bool
hasFraction
=
false
;
NUM_TO_STRING
(
type
,
input
,
sizeof
(
fraction
),
fraction
);
...
...
@@ -822,7 +812,8 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
}
else
if
(
tsDigits
==
TSDB_TIME_PRECISION_NANO_DIGITS
)
{
timeVal
=
timeVal
/
(
1000
*
1000
*
1000
);
}
else
{
assert
(
0
);
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
hasFraction
=
true
;
memmove
(
fraction
,
fraction
+
TSDB_TIME_PRECISION_SEC_DIGITS
,
TSDB_TIME_PRECISION_SEC_DIGITS
);
...
...
@@ -852,7 +843,6 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
varDataSetLen
(
buf
,
len
);
colDataAppend
(
pOutput
->
columnData
,
i
,
buf
,
false
);
input
+=
tDataTypes
[
type
].
bytes
;
}
pOutput
->
numOfRows
=
pInput
->
numOfRows
;
...
...
@@ -864,12 +854,12 @@ int32_t toUnixtimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
int32_t
type
=
GET_PARAM_TYPE
(
pInput
);
int32_t
timePrec
=
GET_PARAM_PRECISON
(
pInput
);
char
*
input
=
pInput
[
0
].
columnData
->
pData
+
pInput
[
0
].
columnData
->
varmeta
.
offset
[
0
];
for
(
int32_t
i
=
0
;
i
<
pInput
[
0
].
numOfRows
;
++
i
)
{
if
(
colDataIsNull_s
(
pInput
[
0
].
columnData
,
i
))
{
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
char
*
input
=
colDataGetData
(
pInput
[
0
].
columnData
,
i
);
int64_t
timeVal
=
0
;
int32_t
ret
=
convertStringToTimestamp
(
type
,
input
,
timePrec
,
&
timeVal
);
...
...
@@ -879,7 +869,6 @@ int32_t toUnixtimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
}
colDataAppend
(
pOutput
->
columnData
,
i
,
(
char
*
)
&
timeVal
,
false
);
input
+=
varDataTLen
(
input
);
}
pOutput
->
numOfRows
=
pInput
->
numOfRows
;
...
...
@@ -897,19 +886,14 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
int64_t
factor
=
(
timePrec
==
TSDB_TIME_PRECISION_MILLI
)
?
1000
:
(
timePrec
==
TSDB_TIME_PRECISION_MICRO
?
1000000
:
1000000000
);
char
*
input
=
NULL
;
if
(
IS_VAR_DATA_TYPE
(
type
))
{
input
=
pInput
[
0
].
columnData
->
pData
+
pInput
[
0
].
columnData
->
varmeta
.
offset
[
0
];
}
else
{
input
=
pInput
[
0
].
columnData
->
pData
;
}
for
(
int32_t
i
=
0
;
i
<
pInput
[
0
].
numOfRows
;
++
i
)
{
if
(
colDataIsNull_s
(
pInput
[
0
].
columnData
,
i
))
{
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
char
*
input
=
colDataGetData
(
pInput
[
0
].
columnData
,
i
);
if
(
IS_VAR_DATA_TYPE
(
type
))
{
/* datetime format strings */
int32_t
ret
=
convertStringToTimestamp
(
type
,
input
,
TSDB_TIME_PRECISION_NANO
,
&
timeVal
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -959,7 +943,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
}
else
if
(
tsDigits
<=
TSDB_TIME_PRECISION_SEC_DIGITS
){
timeVal
=
timeVal
*
factor
;
}
else
{
assert
(
0
);
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
break
;
}
...
...
@@ -973,7 +958,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
}
else
if
(
tsDigits
<=
TSDB_TIME_PRECISION_SEC_DIGITS
)
{
timeVal
=
timeVal
*
factor
;
}
else
{
assert
(
0
);
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
break
;
}
...
...
@@ -987,7 +973,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
}
else
if
(
tsDigits
<=
TSDB_TIME_PRECISION_SEC_DIGITS
)
{
timeVal
=
timeVal
*
factor
/
factor
/
60
*
60
*
factor
;
}
else
{
assert
(
0
);
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
break
;
}
...
...
@@ -1001,7 +988,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
}
else
if
(
tsDigits
<=
TSDB_TIME_PRECISION_SEC_DIGITS
)
{
timeVal
=
timeVal
*
factor
/
factor
/
3600
*
3600
*
factor
;
}
else
{
assert
(
0
);
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
break
;
}
...
...
@@ -1015,7 +1003,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
}
else
if
(
tsDigits
<=
TSDB_TIME_PRECISION_SEC_DIGITS
)
{
timeVal
=
timeVal
*
factor
/
factor
/
86400
*
86400
*
factor
;
}
else
{
assert
(
0
);
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
break
;
}
...
...
@@ -1029,7 +1018,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
}
else
if
(
tsDigits
<=
TSDB_TIME_PRECISION_SEC_DIGITS
)
{
timeVal
=
timeVal
*
factor
/
factor
/
604800
*
604800
*
factor
;
}
else
{
assert
(
0
);
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
break
;
}
...
...
@@ -1068,11 +1058,6 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
}
colDataAppend
(
pOutput
->
columnData
,
i
,
(
char
*
)
&
timeVal
,
false
);
if
(
IS_VAR_DATA_TYPE
(
type
))
{
input
+=
varDataTLen
(
input
);
}
else
{
input
+=
tDataTypes
[
type
].
bytes
;
}
}
pOutput
->
numOfRows
=
pInput
->
numOfRows
;
...
...
@@ -1094,12 +1079,6 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
type
!=
TSDB_DATA_TYPE_BINARY
&&
type
!=
TSDB_DATA_TYPE_NCHAR
)
{
return
TSDB_CODE_FAILED
;
}
if
(
IS_VAR_DATA_TYPE
(
type
))
{
input
[
k
]
=
pInput
[
k
].
columnData
->
pData
+
pInput
[
k
].
columnData
->
varmeta
.
offset
[
0
];
}
else
{
input
[
k
]
=
pInput
[
k
].
columnData
->
pData
;
}
}
for
(
int32_t
i
=
0
;
i
<
pInput
[
0
].
numOfRows
;
++
i
)
{
...
...
@@ -1109,6 +1088,9 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
continue
;
}
int32_t
rowIdx
=
(
pInput
[
k
].
numOfRows
==
1
)
?
0
:
i
;
input
[
k
]
=
colDataGetData
(
pInput
[
k
].
columnData
,
rowIdx
);
int32_t
type
=
GET_PARAM_TYPE
(
&
pInput
[
k
]);
if
(
IS_VAR_DATA_TYPE
(
type
))
{
/* datetime format strings */
int32_t
ret
=
convertStringToTimestamp
(
type
,
input
[
k
],
TSDB_TIME_PRECISION_NANO
,
&
timeVal
[
k
]);
...
...
@@ -1138,14 +1120,9 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
timeVal
[
k
]
=
timeVal
[
k
]
*
1000
;
}
else
if
(
tsDigits
==
TSDB_TIME_PRECISION_NANO_DIGITS
)
{
timeVal
[
k
]
=
timeVal
[
k
];
}
}
if
(
pInput
[
k
].
numOfRows
!=
1
)
{
if
(
IS_VAR_DATA_TYPE
(
type
))
{
input
[
k
]
+=
varDataTLen
(
input
[
k
]);
}
else
{
input
[
k
]
+=
tDataTypes
[
type
].
bytes
;
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
}
}
...
...
tests/script/jenkins/basic.txt
浏览文件 @
ba06204d
...
...
@@ -55,7 +55,7 @@
# ---- tmq
./test.sh -f tsim/tmq/basic.sim
./test.sh -f tsim/tmq/basic1.sim
#
./test.sh -f tsim/tmq/basic1.sim
#./test.sh -f tsim/tmq/oneTopic.sim
#./test.sh -f tsim/tmq/multiTopic.sim
...
...
tests/script/tsim/tmq/consume.sh
0 → 100755
浏览文件 @
ba06204d
#!/bin/bash
##################################################
#
# Do tmq test
#
##################################################
set
+e
# set default value for parameters
EXEC_OPTON
=
start
DB_NAME
=
db
POLL_DELAY
=
5
VALGRIND
=
0
SIGNAL
=
SIGINT
while
getopts
"d:s:v:y:x:"
arg
do
case
$arg
in
d
)
DB_NAME
=
$OPTARG
;;
s
)
EXEC_OPTON
=
$OPTARG
;;
v
)
VALGRIND
=
1
;;
y
)
POLL_DELAY
=
$OPTARG
;;
x
)
SIGNAL
=
$OPTARG
;;
?
)
echo
"unkown argument"
;;
esac
done
SCRIPT_DIR
=
`
pwd
`
IN_TDINTERNAL
=
"community"
if
[[
"
$SCRIPT_DIR
"
==
*
"
$IN_TDINTERNAL
"
*
]]
;
then
cd
../../..
else
cd
../../
fi
TOP_DIR
=
`
pwd
`
if
[[
"
$SCRIPT_DIR
"
==
*
"
$IN_TDINTERNAL
"
*
]]
;
then
BIN_DIR
=
`
find
.
-name
"tmq_sim"
|grep bin|head
-n1
|cut
-d
'/'
-f
2,3
`
else
BIN_DIR
=
`
find
.
-name
"tmq_sim"
|grep bin|head
-n1
|cut
-d
'/'
-f
2
`
fi
declare
-x
BUILD_DIR
=
$TOP_DIR
/
$BIN_DIR
declare
-x
SIM_DIR
=
$TOP_DIR
/sim
PROGRAM
=
$BUILD_DIR
/build/bin/tmq_sim
PRG_DIR
=
$SIM_DIR
/tsim
CFG_DIR
=
$PRG_DIR
/cfg
LOG_DIR
=
$PRG_DIR
/log
echo
"------------------------------------------------------------------------"
echo
"BUILD_DIR:
$BUILD_DIR
"
echo
"SIM_DIR :
$SIM_DIR
"
echo
"CFG_DIR :
$CFG_DIR
"
echo
"PROGRAM:
$PROGRAM
echo "
CFG_DIR:
$CFG_DIR
echo
"POLL_DELAY:
$POLL_DELAY
echo "
DB_NAME:
$DB_NAME
echo
"------------------------------------------------------------------------"
if
[
"
$EXEC_OPTON
"
=
"start"
]
;
then
if
[
$VALGRIND
-eq
1
]
;
then
echo nohup
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tmq_sim.log
$PROGRAM
-c
$CFG_DIR
-d
$DB_NAME
-y
$POLL_DELAY
>
/dev/null 2>&1 &
nohup
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tmq_sim.log
$PROGRAM
-c
$CFG_DIR
-d
$DB_NAME
-y
$POLL_DELAY
>
/dev/null 2>&1 &
else
echo
"nohup
$PROGRAM
-c
$CFG_DIR
-d
$DB_NAME
-y
$POLL_DELAY
> /dev/null 2>&1 &"
nohup
$PROGRAM
-c
$CFG_DIR
-y
$POLL_DELAY
-d
$DB_NAME
>
/dev/null 2>&1 &
fi
else
PID
=
`
ps
-ef
|grep tmq_sim |
grep
-v
grep
|
awk
'{print $2}'
`
while
[
-n
"
$PID
"
]
do
if
[
"
$SIGNAL
"
=
"SIGKILL"
]
;
then
echo
try to
kill
by signal SIGKILL
kill
-9
$PID
else
echo
try to
kill
by signal SIGINT
kill
-SIGINT
$PID
fi
sleep
1
PID
=
`
ps
-ef
|grep tmq_sim |
grep
-v
grep
|
awk
'{print $2}'
`
done
fi
tests/test/c/tmqSim.c
浏览文件 @
ba06204d
...
...
@@ -33,26 +33,21 @@
#define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16)
typedef
struct
{
int32_t
expectMsgCnt
;
int32_t
consumeMsgCnt
;
TdThread
thread
;
}
SThreadInfo
;
int32_t
consumerId
;
typedef
struct
{
// input from argvs
char
dbName
[
32
];
char
topicString
[
256
];
int32_t
ifCheckData
;
int64_t
expectMsgCnt
;
int64_t
consumeMsgCnt
;
int32_t
checkresult
;
char
topicString
[
1024
];
char
keyString
[
1024
];
char
topicString1
[
256
];
char
keyString1
[
1024
];
int32_t
showMsgFlag
;
int32_t
consumeDelay
;
// unit s
int32_t
consumeMsgCnt
;
int32_t
checkMode
;
// save result after parse agrvs
int32_t
numOfTopic
;
char
topics
[
32
][
64
];
...
...
@@ -60,15 +55,22 @@ typedef struct {
char
key
[
32
][
64
];
char
value
[
32
][
64
];
int32_t
numOfTopic1
;
char
topics1
[
32
][
64
];
tmq_t
*
tmq
;
tmq_list_t
*
topicList
;
}
SThreadInfo
;
int32_t
numOfKey1
;
char
key1
[
32
][
64
];
char
value1
[
32
][
64
];
typedef
struct
{
// input from argvs
char
dbName
[
32
];
int32_t
showMsgFlag
;
int32_t
consumeDelay
;
// unit s
int32_t
numOfThread
;
SThreadInfo
stThreads
[
MAX_CONSUMER_THREAD_CNT
];
}
SConfInfo
;
static
SConfInfo
g_stConfInfo
;
TdFilePtr
g_fp
=
NULL
;
// char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL;
...
...
@@ -81,30 +83,54 @@ static void printHelp() {
printf
(
"%s%s%s%s
\n
"
,
indent
,
indent
,
"Configuration directory, default is "
,
configDir
);
printf
(
"%s%s
\n
"
,
indent
,
"-d"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"The name of the database for cosumer, no default "
);
printf
(
"%s%s
\n
"
,
indent
,
"-t"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"The topic string for cosumer, no default "
);
printf
(
"%s%s
\n
"
,
indent
,
"-k"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"The key-value string for cosumer, no default "
);
printf
(
"%s%s
\n
"
,
indent
,
"-t1"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"The topic1 string for cosumer, no default "
);
printf
(
"%s%s
\n
"
,
indent
,
"-k1"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"The key1-value1 string for cosumer, no default "
);
printf
(
"%s%s
\n
"
,
indent
,
"-g"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"showMsgFlag, default is "
,
g_stConfInfo
.
showMsgFlag
);
printf
(
"%s%s
\n
"
,
indent
,
"-y"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"consume delay, default is s"
,
g_stConfInfo
.
consumeDelay
);
printf
(
"%s%s
\n
"
,
indent
,
"-m"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"consume msg count, default is s"
,
g_stConfInfo
.
consumeMsgCnt
);
printf
(
"%s%s
\n
"
,
indent
,
"-j"
);
printf
(
"%s%s%s%d
\n
"
,
indent
,
indent
,
"check mode, default is s"
,
g_stConfInfo
.
checkMode
);
exit
(
EXIT_SUCCESS
);
}
void
initLogFile
()
{
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
TdFilePtr
pFile
=
taosOpenFile
(
"./tmqlog.txt"
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_APPEND
|
TD_FILE_STREAM
);
if
(
NULL
==
pFile
)
{
fprintf
(
stderr
,
"Failed to open %s for save result
\n
"
,
"./tmqlog.txt"
);
exit
-
1
;
};
g_fp
=
pFile
;
time_t
tTime
=
taosGetTimestampSec
();
struct
tm
tm
=
*
taosLocalTime
(
&
tTime
,
NULL
);
taosFprintfFile
(
pFile
,
"###################################################################
\n
"
);
taosFprintfFile
(
pFile
,
"# configDir: %s
\n
"
,
configDir
);
taosFprintfFile
(
pFile
,
"# dbName: %s
\n
"
,
g_stConfInfo
.
dbName
);
taosFprintfFile
(
pFile
,
"# showMsgFlag: %d
\n
"
,
g_stConfInfo
.
showMsgFlag
);
taosFprintfFile
(
pFile
,
"# consumeDelay: %d
\n
"
,
g_stConfInfo
.
consumeDelay
);
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
i
++
)
{
taosFprintfFile
(
pFile
,
"# consumer %d info:
\n
"
,
g_stConfInfo
.
stThreads
[
i
].
consumerId
);
taosFprintfFile
(
pFile
,
" Topics: "
);
for
(
int
i
=
0
;
i
<
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
;
i
++
)
{
taosFprintfFile
(
pFile
,
"%s, "
,
g_stConfInfo
.
stThreads
[
i
].
topics
[
i
]);
}
taosFprintfFile
(
pFile
,
"
\n
"
);
taosFprintfFile
(
pFile
,
" Key: "
);
for
(
int
i
=
0
;
i
<
g_stConfInfo
.
stThreads
[
i
].
numOfKey
;
i
++
)
{
taosFprintfFile
(
pFile
,
"%s:%s, "
,
g_stConfInfo
.
stThreads
[
i
].
key
[
i
],
g_stConfInfo
.
stThreads
[
i
].
value
[
i
]);
}
taosFprintfFile
(
pFile
,
"
\n
"
);
}
taosFprintfFile
(
pFile
,
"# Test time: %d-%02d-%02d %02d:%02d:%02d
\n
"
,
tm
.
tm_year
+
1900
,
tm
.
tm_mon
+
1
,
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
);
taosFprintfFile
(
pFile
,
"###################################################################
\n
"
);
}
void
parseArgument
(
int32_t
argc
,
char
*
argv
[])
{
memset
(
&
g_stConfInfo
,
0
,
sizeof
(
SConfInfo
));
g_stConfInfo
.
showMsgFlag
=
0
;
g_stConfInfo
.
consumeDelay
=
8000
;
g_stConfInfo
.
consumeMsgCnt
=
0
;
g_stConfInfo
.
consumeDelay
=
5
;
for
(
int32_t
i
=
1
;
i
<
argc
;
i
++
)
{
if
(
strcmp
(
argv
[
i
],
"-h"
)
==
0
||
strcmp
(
argv
[
i
],
"--help"
)
==
0
)
{
...
...
@@ -114,37 +140,20 @@ void parseArgument(int32_t argc, char* argv[]) {
strcpy
(
g_stConfInfo
.
dbName
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
)
{
strcpy
(
configDir
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-t"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
topicString
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-k"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
keyString
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-t1"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
topicString1
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-k1"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
keyString1
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-g"
)
==
0
)
{
g_stConfInfo
.
showMsgFlag
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-y"
)
==
0
)
{
g_stConfInfo
.
consumeDelay
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-m"
)
==
0
)
{
g_stConfInfo
.
consumeMsgCnt
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-j"
)
==
0
)
{
g_stConfInfo
.
checkMode
=
atol
(
argv
[
++
i
]);
}
else
{
printf
(
"%s unknow para: %s %s"
,
GREEN
,
argv
[
++
i
],
NC
);
exit
(
-
1
);
}
}
if
(
0
==
g_stConfInfo
.
consumeMsgCnt
)
{
g_stConfInfo
.
consumeMsgCnt
=
0x7fffffff
;
}
#if 0
#if 1
pPrint
(
"%s configDir:%s %s"
,
GREEN
,
configDir
,
NC
);
pPrint
(
"%s dbName:%s %s"
,
GREEN
,
g_stConfInfo
.
dbName
,
NC
);
pPrint("%s topicString:%s %s", GREEN, g_stConfInfo.topicString, NC);
pPrint("%s keyString:%s %s", GREEN, g_stConfInfo.keyString, NC);
pPrint
(
"%s consumeDelay:%d %s"
,
GREEN
,
g_stConfInfo
.
consumeDelay
,
NC
);
pPrint
(
"%s showMsgFlag:%d %s"
,
GREEN
,
g_stConfInfo
.
showMsgFlag
,
NC
);
#endif
}
...
...
@@ -171,74 +180,26 @@ void ltrim(char* str) {
// return str;
}
void
parseInputString
()
{
// printf("topicString: %s\n", g_stConfInfo.topicString);
// printf("keyString: %s\n\n", g_stConfInfo.keyString);
char
*
token
;
const
char
delim
[
2
]
=
","
;
const
char
ch
=
':'
;
token
=
strtok
(
g_stConfInfo
.
topicString
,
delim
);
while
(
token
!=
NULL
)
{
// printf("%s\n", token );
strcpy
(
g_stConfInfo
.
topics
[
g_stConfInfo
.
numOfTopic
],
token
);
ltrim
(
g_stConfInfo
.
topics
[
g_stConfInfo
.
numOfTopic
]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo
.
numOfTopic
++
;
token
=
strtok
(
NULL
,
delim
);
}
token
=
strtok
(
g_stConfInfo
.
topicString1
,
delim
);
while
(
token
!=
NULL
)
{
// printf("%s\n", token );
strcpy
(
g_stConfInfo
.
topics1
[
g_stConfInfo
.
numOfTopic1
],
token
);
ltrim
(
g_stConfInfo
.
topics1
[
g_stConfInfo
.
numOfTopic1
]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo
.
numOfTopic1
++
;
token
=
strtok
(
NULL
,
delim
);
}
token
=
strtok
(
g_stConfInfo
.
keyString
,
delim
);
while
(
token
!=
NULL
)
{
// printf("%s\n", token );
{
char
*
pstr
=
token
;
ltrim
(
pstr
);
char
*
ret
=
strchr
(
pstr
,
ch
);
memcpy
(
g_stConfInfo
.
key
[
g_stConfInfo
.
numOfKey
],
pstr
,
ret
-
pstr
);
strcpy
(
g_stConfInfo
.
value
[
g_stConfInfo
.
numOfKey
],
ret
+
1
);
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo
.
numOfKey
++
;
}
token
=
strtok
(
NULL
,
delim
);
}
token
=
strtok
(
g_stConfInfo
.
keyString1
,
delim
);
while
(
token
!=
NULL
)
{
// printf("%s\n", token );
{
char
*
pstr
=
token
;
ltrim
(
pstr
);
char
*
ret
=
strchr
(
pstr
,
ch
);
memcpy
(
g_stConfInfo
.
key1
[
g_stConfInfo
.
numOfKey1
],
pstr
,
ret
-
pstr
);
strcpy
(
g_stConfInfo
.
value1
[
g_stConfInfo
.
numOfKey1
],
ret
+
1
);
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo
.
numOfKey1
++
;
}
token
=
strtok
(
NULL
,
delim
);
static
int
running
=
1
;
static
void
msg_process
(
TAOS_RES
*
msg
,
int32_t
msgIndex
,
int32_t
threadLable
)
{
char
buf
[
1024
];
//printf("topic: %s\n", tmq_get_topic_name(msg));
//printf("vg:%d\n", tmq_get_vgroup_id(msg));
taosFprintfFile
(
g_fp
,
"msg index:%d, threadLable: %d
\n
"
,
msgIndex
,
threadLable
);
taosFprintfFile
(
g_fp
,
"topic: %s, vgroupId: %d
\n
"
,
tmq_get_topic_name
(
msg
),
tmq_get_vgroup_id
(
msg
));
while
(
1
)
{
TAOS_ROW
row
=
taos_fetch_row
(
msg
);
if
(
row
==
NULL
)
break
;
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
msg
);
int32_t
numOfFields
=
taos_field_count
(
msg
);
//taos_print_row(buf, row, fields, numOfFields);
//printf("%s\n", buf);
//taosFprintfFile(g_fp, "%s\n", buf);
}
}
static
int
running
=
1
;
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
int
queryDB
(
TAOS
*
taos
,
char
*
command
)
{
TAOS_RES
*
pRes
=
taos_query
(
taos
,
command
);
int
code
=
taos_errno
(
pRes
);
...
...
@@ -252,8 +213,7 @@ int queryDB(TAOS* taos, char* command) {
return
0
;
}
tmq_t
*
build_consumer
()
{
#if 0
void
build_consumer
(
SThreadInfo
*
pInfo
)
{
char
sqlStr
[
1024
]
=
{
0
};
TAOS
*
pConn
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
...
...
@@ -267,135 +227,68 @@ tmq_t* build_consumer() {
exit
(
-
1
);
}
taos_free_result
(
pRes
);
#endif
tmq_conf_t
*
conf
=
tmq_conf_new
();
// tmq_conf_set(conf, "group.id", "tg2");
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfKey
;
i
++
)
{
tmq_conf_set
(
conf
,
g_stConfInfo
.
key
[
i
],
g_stConfInfo
.
value
[
i
]);
}
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"td.connect.db"
,
g_stConfInfo
.
dbName
);
tmq_t
*
tmq
=
tmq_consumer_new1
(
conf
,
NULL
,
0
);
assert
(
tmq
);
tmq_conf_destroy
(
conf
);
return
tmq
;
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numOfKey
;
i
++
)
{
tmq_conf_set
(
conf
,
pInfo
->
key
[
i
],
pInfo
->
value
[
i
]);
}
pInfo
->
tmq
=
tmq_consumer_new
(
pConn
,
conf
,
NULL
,
0
);
return
;
}
tmq_list_t
*
build_topic_list
(
)
{
tmq_list_t
*
topic_l
ist
=
tmq_list_new
();
void
build_topic_list
(
SThreadInfo
*
pInfo
)
{
pInfo
->
topicL
ist
=
tmq_list_new
();
// tmq_list_append(topic_list, "test_stb_topic_1");
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfTopic
;
i
++
)
{
tmq_list_append
(
topic_list
,
g_stConfInfo
.
topics
[
i
]);
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numOfTopic
;
i
++
)
{
tmq_list_append
(
pInfo
->
topicList
,
pInfo
->
topics
[
i
]);
}
return
topic_list
;
return
;
}
tmq_t
*
build_consumer_x
()
{
#if 0
int32_t
saveConsumeResult
(
SThreadInfo
*
pInfo
)
{
char
sqlStr
[
1024
]
=
{
0
};
TAOS
*
pConn
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf
(
sqlStr
,
"insert into %s.consumeresult values (now, %d, %"
PRId64
", %d)"
,
g_stConfInfo
.
dbName
,
pInfo
->
consumerId
,
pInfo
->
consumeMsgCnt
,
pInfo
->
checkresult
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
sqlStr
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf("error in
use db
, reason:%s\n", taos_errstr(pRes));
printf
(
"error in
save consumeinfo
, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
exit
(
-
1
);
}
taos_free_result(pRes);
#endif
tmq_conf_t
*
conf
=
tmq_conf_new
();
// tmq_conf_set(conf, "group.id", "tg2");
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfKey1
;
i
++
)
{
tmq_conf_set
(
conf
,
g_stConfInfo
.
key1
[
i
],
g_stConfInfo
.
value1
[
i
]);
}
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"td.connect.db"
,
g_stConfInfo
.
dbName
);
tmq_t
*
tmq
=
tmq_consumer_new1
(
conf
,
NULL
,
0
);
assert
(
tmq
);
tmq_conf_destroy
(
conf
);
return
tmq
;
}
taos_free_result
(
pRes
);
tmq_list_t
*
build_topic_list_x
()
{
tmq_list_t
*
topic_list
=
tmq_list_new
();
// tmq_list_append(topic_list, "test_stb_topic_1");
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfTopic1
;
i
++
)
{
tmq_list_append
(
topic_list
,
g_stConfInfo
.
topics1
[
i
]);
}
return
topic_list
;
return
0
;
}
void
loop_consume
(
tmq_t
*
tmq
)
{
void
loop_consume
(
SThreadInfo
*
pInfo
)
{
tmq_resp_err_t
err
;
int
32
_t
totalMsgs
=
0
;
int32
_t
totalRows
=
0
;
int32_t
skipLogNum
=
0
;
int
64
_t
totalMsgs
=
0
;
//int64
_t totalRows = 0;
while
(
running
)
{
TAOS_RES
*
tmqMsg
=
tmq_consumer_poll
(
tmq
,
8
000
);
TAOS_RES
*
tmqMsg
=
tmq_consumer_poll
(
pInfo
->
tmq
,
g_stConfInfo
.
consumeDelay
*
1
000
);
if
(
tmqMsg
)
{
totalMsgs
++
;
#if 0
TAOS_ROW row;
while (NULL != (row = tmq_get_row(tmqMsg))) {
totalRows++;
}
#endif
/*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/
if
(
0
!=
g_stConfInfo
.
showMsgFlag
)
{
/*msg_process(tmqMsg);*/
}
tmq_message_destroy
(
tmqMsg
);
}
else
{
break
;
}
msg_process
(
tmqMsg
,
totalMsgs
,
0
);
}
err
=
tmq_consumer_close
(
tmq
);
if
(
err
)
{
printf
(
"tmq_consumer_close() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
exit
(
-
1
);
}
printf
(
"{consume success: %d, %d}"
,
totalMsgs
,
totalRows
);
}
int32_t
parallel_consume
(
tmq_t
*
tmq
,
int
threadLable
)
{
tmq_resp_err_t
err
;
tmq_message_destroy
(
tmqMsg
);
int32_t
totalMsgs
=
0
;
int32_t
totalRows
=
0
;
int32_t
skipLogNum
=
0
;
while
(
running
)
{
TAOS_RES
*
tmqMsg
=
tmq_consumer_poll
(
tmq
,
g_stConfInfo
.
consumeDelay
*
1000
);
if
(
tmqMsg
)
{
totalMsgs
++
;
// printf("threadFlag: %d, totalMsgs: %d\n", threadLable, totalMsgs);
#if 0
TAOS_ROW row;
while (NULL != (row = tmq_get_row(tmqMsg))) {
totalRows++;
}
#endif
/*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/
if
(
0
!=
g_stConfInfo
.
showMsgFlag
)
{
/*msg_process(tmqMsg);*/
}
tmq_message_destroy
(
tmqMsg
);
if
(
totalMsgs
>=
g_stConfInfo
.
consumeMsgCnt
)
{
if
(
totalMsgs
>=
pInfo
->
expectMsgCnt
)
{
break
;
}
}
else
{
...
...
@@ -403,137 +296,160 @@ int32_t parallel_consume(tmq_t* tmq, int threadLable) {
}
}
err
=
tmq_consumer_close
(
tmq
);
err
=
tmq_consumer_close
(
pInfo
->
tmq
);
if
(
err
)
{
printf
(
"tmq_consumer_close() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
exit
(
-
1
);
}
// printf("%d", totalMsgs); // output to sim for check result
return
totalMsgs
;
pInfo
->
consumeMsgCnt
=
totalMsgs
;
}
void
*
threadFunc
(
void
*
param
)
{
void
*
consumeThreadFunc
(
void
*
param
)
{
int32_t
totalMsgs
=
0
;
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
param
;
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
param
;
tmq_t
*
tmq
=
build_consumer_x
(
);
tmq_list_t
*
topic_list
=
build_topic_list_x
(
);
if
((
NULL
==
tmq
)
||
(
NULL
==
topic_list
))
{
build_consumer
(
pInfo
);
build_topic_list
(
pInfo
);
if
((
NULL
==
pInfo
->
tmq
)
||
(
NULL
==
pInfo
->
topicList
))
{
return
NULL
;
}
tmq_resp_err_t
err
=
tmq_subscribe
(
tmq
,
topic_l
ist
);
tmq_resp_err_t
err
=
tmq_subscribe
(
pInfo
->
tmq
,
pInfo
->
topicL
ist
);
if
(
err
)
{
printf
(
"tmq_subscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
exit
(
-
1
);
}
// if (0 == g_stConfInfo.consumeMsgCnt) {
// loop_consume(tmq);
// } else {
pInfo
->
consumeMsgCnt
=
parallel_consume
(
tmq
,
1
);
//}
loop_consume
(
pInfo
);
err
=
tmq_unsubscribe
(
tmq
);
err
=
tmq_unsubscribe
(
pInfo
->
tmq
);
if
(
err
)
{
printf
(
"tmq_unsubscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
pInfo
->
consumeMsgCnt
=
-
1
;
return
NULL
;
}
// save consume result into consumeresult table
saveConsumeResult
(
pInfo
);
return
NULL
;
}
int
main
(
int32_t
argc
,
char
*
argv
[])
{
parseArgument
(
argc
,
argv
);
parseInputString
();
void
parseConsumeInfo
()
{
char
*
token
;
const
char
delim
[
2
]
=
","
;
const
char
ch
=
':'
;
int32_t
numOfThreads
=
1
;
TdThreadAttr
thattr
;
taosThreadAttrInit
(
&
thattr
);
taosThreadAttrSetDetachState
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
SThreadInfo
*
pInfo
=
(
SThreadInfo
*
)
taosMemoryCalloc
(
numOfThreads
,
sizeof
(
SThreadInfo
));
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
i
++
)
{
token
=
strtok
(
g_stConfInfo
.
stThreads
[
i
].
topicString
,
delim
);
while
(
token
!=
NULL
)
{
// printf("%s\n", token );
strcpy
(
g_stConfInfo
.
stThreads
[
i
].
topics
[
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
],
token
);
ltrim
(
g_stConfInfo
.
stThreads
[
i
].
topics
[
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
++
;
if
(
g_stConfInfo
.
numOfTopic1
)
{
// pthread_create one thread to consume
for
(
int32_t
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
pInfo
[
i
].
expectMsgCnt
=
0
;
pInfo
[
i
].
consumeMsgCnt
=
0
;
taosThreadCreate
(
&
(
pInfo
[
i
].
thread
),
&
thattr
,
threadFunc
,
(
void
*
)(
pInfo
+
i
));
}
token
=
strtok
(
NULL
,
delim
);
}
int32_t
totalMsgs
=
0
;
tmq_t
*
tmq
=
build_consumer
();
tmq_list_t
*
topic_list
=
build_topic_list
();
if
((
NULL
==
tmq
)
||
(
NULL
==
topic_list
))
{
return
-
1
;
token
=
strtok
(
g_stConfInfo
.
stThreads
[
i
].
keyString
,
delim
);
while
(
token
!=
NULL
)
{
// printf("%s\n", token );
{
char
*
pstr
=
token
;
ltrim
(
pstr
);
char
*
ret
=
strchr
(
pstr
,
ch
);
memcpy
(
g_stConfInfo
.
stThreads
[
i
].
key
[
g_stConfInfo
.
stThreads
[
i
].
numOfKey
],
pstr
,
ret
-
pstr
);
strcpy
(
g_stConfInfo
.
stThreads
[
i
].
value
[
g_stConfInfo
.
stThreads
[
i
].
numOfKey
],
ret
+
1
);
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo
.
stThreads
[
i
].
numOfKey
++
;
}
tmq_resp_err_t
err
=
tmq_subscribe
(
tmq
,
topic_list
);
if
(
err
)
{
printf
(
"tmq_subscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
exit
(
-
1
);
token
=
strtok
(
NULL
,
delim
);
}
if
(
0
==
g_stConfInfo
.
numOfTopic1
)
{
loop_consume
(
tmq
);
}
else
{
totalMsgs
=
parallel_consume
(
tmq
,
0
);
}
}
err
=
tmq_unsubscribe
(
tmq
);
if
(
err
)
{
printf
(
"tmq_unsubscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
int32_t
getConsumeInfo
()
{
char
sqlStr
[
1024
]
=
{
0
};
TAOS
*
pConn
=
taos_connect
(
NULL
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
sprintf
(
sqlStr
,
"select * from %s.consumeinfo"
,
g_stConfInfo
.
dbName
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
sqlStr
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in get consumeinfo, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
exit
(
-
1
);
}
if
(
g_stConfInfo
.
numOfTopic1
)
{
for
(
int32_t
i
=
0
;
i
<
numOfThreads
;
i
++
)
{
taosThreadJoin
(
pInfo
[
i
].
thread
,
NULL
);
}
TAOS_ROW
row
=
NULL
;
int
num_fields
=
taos_num_fields
(
pRes
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
pRes
);
// printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
if
(
0
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
+
pInfo
->
consumeMsgCnt
)
==
g_stConfInfo
.
consumeMsgCnt
)
{
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
// schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int
int32_t
numOfThread
=
0
;
while
((
row
=
taos_fetch_row
(
pRes
)))
{
int32_t
*
lengths
=
taos_fetch_lengths
(
pRes
);
for
(
int
i
=
0
;
i
<
num_fields
;
++
i
)
{
if
(
row
[
i
]
==
NULL
||
0
==
i
)
{
continue
;
}
}
else
if
(
1
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
==
g_stConfInfo
.
consumeMsgCnt
)
&&
(
pInfo
->
consumeMsgCnt
==
g_stConfInfo
.
consumeMsgCnt
))
{
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
if
((
1
==
i
)
&&
(
fields
[
i
].
type
==
TSDB_DATA_TYPE_INT
))
{
g_stConfInfo
.
stThreads
[
numOfThread
].
consumerId
=
*
((
int32_t
*
)
row
[
i
]);
}
else
if
((
2
==
i
)
&&
(
fields
[
i
].
type
==
TSDB_DATA_TYPE_BINARY
))
{
memcpy
(
g_stConfInfo
.
stThreads
[
numOfThread
].
topicString
,
row
[
i
],
lengths
[
i
]);
}
else
if
((
3
==
i
)
&&
(
fields
[
i
].
type
==
TSDB_DATA_TYPE_BINARY
))
{
memcpy
(
g_stConfInfo
.
stThreads
[
numOfThread
].
keyString
,
row
[
i
],
lengths
[
i
]);
}
else
if
((
4
==
i
)
&&
(
fields
[
i
].
type
==
TSDB_DATA_TYPE_BIGINT
))
{
g_stConfInfo
.
stThreads
[
numOfThread
].
expectMsgCnt
=
*
((
int64_t
*
)
row
[
i
]);
}
else
if
((
5
==
i
)
&&
(
fields
[
i
].
type
==
TSDB_DATA_TYPE_INT
))
{
g_stConfInfo
.
stThreads
[
numOfThread
].
ifCheckData
=
*
((
int32_t
*
)
row
[
i
]);
}
}
else
if
(
2
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
+
pInfo
->
consumeMsgCnt
)
==
3
*
g_stConfInfo
.
consumeMsgCnt
)
{
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
3
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
==
2
*
g_stConfInfo
.
consumeMsgCnt
)
&&
(
pInfo
->
consumeMsgCnt
==
2
*
g_stConfInfo
.
consumeMsgCnt
))
{
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
4
==
g_stConfInfo
.
checkMode
)
{
if
(((
totalMsgs
==
0
)
&&
(
pInfo
->
consumeMsgCnt
==
3
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
0
)
&&
(
totalMsgs
==
3
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
g_stConfInfo
.
consumeMsgCnt
)
&&
(
totalMsgs
==
2
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
2
*
g_stConfInfo
.
consumeMsgCnt
)
&&
(
totalMsgs
==
g_stConfInfo
.
consumeMsgCnt
)))
{
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
numOfThread
++
;
}
}
else
{
printf
(
"fail, check mode unknow. consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
g_stConfInfo
.
numOfThread
=
numOfThread
;
taos_free_result
(
pRes
);
parseConsumeInfo
();
return
0
;
}
int
main
(
int32_t
argc
,
char
*
argv
[])
{
parseArgument
(
argc
,
argv
);
getConsumeInfo
();
initLogFile
();
TdThreadAttr
thattr
;
taosThreadAttrInit
(
&
thattr
);
taosThreadAttrSetDetachState
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
// pthread_create one thread to consume
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
++
i
)
{
taosThreadCreate
(
&
(
g_stConfInfo
.
stThreads
[
i
].
thread
),
&
thattr
,
consumeThreadFunc
,
(
void
*
)(
&
(
g_stConfInfo
.
stThreads
[
i
])));
}
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
i
++
)
{
taosThreadJoin
(
g_stConfInfo
.
stThreads
[
i
].
thread
,
NULL
);
}
//printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
taosFprintfFile
(
g_fp
,
"
\n
"
);
taosCloseFile
(
&
g_fp
);
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录