Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2ae4f5fa
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2ae4f5fa
编写于
4月 20, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/check
上级
64b0c3a0
d0b9b907
变更
20
展开全部
隐藏空白更改
内联
并排
Showing
20 changed file
with
3602 addition
and
3238 deletion
+3602
-3238
include/common/tmsg.h
include/common/tmsg.h
+6
-0
include/common/trow.h
include/common/trow.h
+2
-2
include/common/ttokendef.h
include/common/ttokendef.h
+150
-149
include/libs/nodes/cmdnodes.h
include/libs/nodes/cmdnodes.h
+1
-0
include/util/tdef.h
include/util/tdef.h
+6
-6
source/common/src/tmsg.c
source/common/src/tmsg.c
+49
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+4
-0
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+5
-3
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+73
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+18
-14
source/libs/parser/inc/parAst.h
source/libs/parser/inc/parAst.h
+1
-0
source/libs/parser/inc/sql.y
source/libs/parser/inc/sql.y
+3
-1
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+0
-3
source/libs/parser/src/parTokenizer.c
source/libs/parser/src/parTokenizer.c
+1
-0
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+351
-137
source/libs/parser/src/sql.c
source/libs/parser/src/sql.c
+2555
-2541
source/libs/scalar/src/sclfunc.c
source/libs/scalar/src/sclfunc.c
+32
-55
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-1
tests/script/tsim/tmq/consume.sh
tests/script/tsim/tmq/consume.sh
+103
-0
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+241
-325
未找到文件。
include/common/tmsg.h
浏览文件 @
2ae4f5fa
...
...
@@ -280,10 +280,14 @@ typedef struct {
int32_t
numOfTags
;
int32_t
numOfSmas
;
int32_t
commentLen
;
int32_t
ast1Len
;
int32_t
ast2Len
;
SArray
*
pColumns
;
// array of SField
SArray
*
pTags
;
// array of SField
SArray
*
pSmas
;
// array of SField
char
*
comment
;
char
*
pAst1
;
char
*
pAst2
;
}
SMCreateStbReq
;
int32_t
tSerializeSMCreateStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMCreateStbReq
*
pReq
);
...
...
@@ -609,6 +613,8 @@ typedef struct {
int8_t
cacheLastRow
;
int8_t
streamMode
;
int8_t
singleSTable
;
int32_t
numOfRetensions
;
SArray
*
pRetensions
;
}
SDbCfgRsp
;
int32_t
tSerializeSDbCfgRsp
(
void
*
buf
,
int32_t
bufLen
,
const
SDbCfgRsp
*
pRsp
);
...
...
include/common/trow.h
浏览文件 @
2ae4f5fa
...
...
@@ -308,8 +308,8 @@ static FORCE_INLINE int32_t tdSetBitmapValTypeII(void *pBitmap, int16_t colIdx,
// use literal value directly and not use formula to simplify the codes
switch
(
nOffset
)
{
case
0
:
*
pDestByte
=
((
*
pDestByte
)
&
0x3F
)
|
(
valType
<<
6
);
// set the value and clear other partitions for offset 0
*
pDestByte
=
(
valType
<<
6
);
// *pDestByte |= (valType << 6);
break
;
case
1
:
...
...
@@ -417,8 +417,8 @@ static FORCE_INLINE int32_t tdSetBitmapValTypeI(void *pBitmap, int16_t colIdx, T
// use literal value directly and not use formula to simplify the codes
switch
(
nOffset
)
{
case
0
:
*
pDestByte
=
((
*
pDestByte
)
&
0x7F
)
|
(
valType
<<
7
);
// set the value and clear other partitions for offset 0
*
pDestByte
=
(
valType
<<
7
);
// *pDestByte |= (valType << 7);
break
;
case
1
:
...
...
include/common/ttokendef.h
浏览文件 @
2ae4f5fa
...
...
@@ -86,155 +86,156 @@
#define TK_SINGLE_STABLE 68
#define TK_STREAM_MODE 69
#define TK_RETENTIONS 70
#define TK_NK_COMMA 71
#define TK_NK_COLON 72
#define TK_TABLE 73
#define TK_NK_LP 74
#define TK_NK_RP 75
#define TK_STABLE 76
#define TK_ADD 77
#define TK_COLUMN 78
#define TK_MODIFY 79
#define TK_RENAME 80
#define TK_TAG 81
#define TK_SET 82
#define TK_NK_EQ 83
#define TK_USING 84
#define TK_TAGS 85
#define TK_NK_DOT 86
#define TK_COMMENT 87
#define TK_BOOL 88
#define TK_TINYINT 89
#define TK_SMALLINT 90
#define TK_INT 91
#define TK_INTEGER 92
#define TK_BIGINT 93
#define TK_FLOAT 94
#define TK_DOUBLE 95
#define TK_BINARY 96
#define TK_TIMESTAMP 97
#define TK_NCHAR 98
#define TK_UNSIGNED 99
#define TK_JSON 100
#define TK_VARCHAR 101
#define TK_MEDIUMBLOB 102
#define TK_BLOB 103
#define TK_VARBINARY 104
#define TK_DECIMAL 105
#define TK_SMA 106
#define TK_ROLLUP 107
#define TK_FILE_FACTOR 108
#define TK_NK_FLOAT 109
#define TK_DELAY 110
#define TK_SHOW 111
#define TK_DATABASES 112
#define TK_TABLES 113
#define TK_STABLES 114
#define TK_MNODES 115
#define TK_MODULES 116
#define TK_QNODES 117
#define TK_FUNCTIONS 118
#define TK_INDEXES 119
#define TK_FROM 120
#define TK_ACCOUNTS 121
#define TK_APPS 122
#define TK_CONNECTIONS 123
#define TK_LICENCE 124
#define TK_GRANTS 125
#define TK_QUERIES 126
#define TK_SCORES 127
#define TK_TOPICS 128
#define TK_VARIABLES 129
#define TK_BNODES 130
#define TK_SNODES 131
#define TK_LIKE 132
#define TK_INDEX 133
#define TK_FULLTEXT 134
#define TK_FUNCTION 135
#define TK_INTERVAL 136
#define TK_TOPIC 137
#define TK_AS 138
#define TK_DESC 139
#define TK_DESCRIBE 140
#define TK_RESET 141
#define TK_QUERY 142
#define TK_EXPLAIN 143
#define TK_ANALYZE 144
#define TK_VERBOSE 145
#define TK_NK_BOOL 146
#define TK_RATIO 147
#define TK_COMPACT 148
#define TK_VNODES 149
#define TK_IN 150
#define TK_OUTPUTTYPE 151
#define TK_AGGREGATE 152
#define TK_BUFSIZE 153
#define TK_STREAM 154
#define TK_INTO 155
#define TK_TRIGGER 156
#define TK_AT_ONCE 157
#define TK_WINDOW_CLOSE 158
#define TK_WATERMARK 159
#define TK_KILL 160
#define TK_CONNECTION 161
#define TK_MERGE 162
#define TK_VGROUP 163
#define TK_REDISTRIBUTE 164
#define TK_SPLIT 165
#define TK_SYNCDB 166
#define TK_NULL 167
#define TK_NK_QUESTION 168
#define TK_NK_ARROW 169
#define TK_ROWTS 170
#define TK_TBNAME 171
#define TK_QSTARTTS 172
#define TK_QENDTS 173
#define TK_WSTARTTS 174
#define TK_WENDTS 175
#define TK_WDURATION 176
#define TK_CAST 177
#define TK_NOW 178
#define TK_TODAY 179
#define TK_TIMEZONE 180
#define TK_COUNT 181
#define TK_FIRST 182
#define TK_LAST 183
#define TK_LAST_ROW 184
#define TK_BETWEEN 185
#define TK_IS 186
#define TK_NK_LT 187
#define TK_NK_GT 188
#define TK_NK_LE 189
#define TK_NK_GE 190
#define TK_NK_NE 191
#define TK_MATCH 192
#define TK_NMATCH 193
#define TK_CONTAINS 194
#define TK_JOIN 195
#define TK_INNER 196
#define TK_SELECT 197
#define TK_DISTINCT 198
#define TK_WHERE 199
#define TK_PARTITION 200
#define TK_BY 201
#define TK_SESSION 202
#define TK_STATE_WINDOW 203
#define TK_SLIDING 204
#define TK_FILL 205
#define TK_VALUE 206
#define TK_NONE 207
#define TK_PREV 208
#define TK_LINEAR 209
#define TK_NEXT 210
#define TK_GROUP 211
#define TK_HAVING 212
#define TK_ORDER 213
#define TK_SLIMIT 214
#define TK_SOFFSET 215
#define TK_LIMIT 216
#define TK_OFFSET 217
#define TK_ASC 218
#define TK_NULLS 219
#define TK_STRICT 71
#define TK_NK_COMMA 72
#define TK_NK_COLON 73
#define TK_TABLE 74
#define TK_NK_LP 75
#define TK_NK_RP 76
#define TK_STABLE 77
#define TK_ADD 78
#define TK_COLUMN 79
#define TK_MODIFY 80
#define TK_RENAME 81
#define TK_TAG 82
#define TK_SET 83
#define TK_NK_EQ 84
#define TK_USING 85
#define TK_TAGS 86
#define TK_NK_DOT 87
#define TK_COMMENT 88
#define TK_BOOL 89
#define TK_TINYINT 90
#define TK_SMALLINT 91
#define TK_INT 92
#define TK_INTEGER 93
#define TK_BIGINT 94
#define TK_FLOAT 95
#define TK_DOUBLE 96
#define TK_BINARY 97
#define TK_TIMESTAMP 98
#define TK_NCHAR 99
#define TK_UNSIGNED 100
#define TK_JSON 101
#define TK_VARCHAR 102
#define TK_MEDIUMBLOB 103
#define TK_BLOB 104
#define TK_VARBINARY 105
#define TK_DECIMAL 106
#define TK_SMA 107
#define TK_ROLLUP 108
#define TK_FILE_FACTOR 109
#define TK_NK_FLOAT 110
#define TK_DELAY 111
#define TK_SHOW 112
#define TK_DATABASES 113
#define TK_TABLES 114
#define TK_STABLES 115
#define TK_MNODES 116
#define TK_MODULES 117
#define TK_QNODES 118
#define TK_FUNCTIONS 119
#define TK_INDEXES 120
#define TK_FROM 121
#define TK_ACCOUNTS 122
#define TK_APPS 123
#define TK_CONNECTIONS 124
#define TK_LICENCE 125
#define TK_GRANTS 126
#define TK_QUERIES 127
#define TK_SCORES 128
#define TK_TOPICS 129
#define TK_VARIABLES 130
#define TK_BNODES 131
#define TK_SNODES 132
#define TK_LIKE 133
#define TK_INDEX 134
#define TK_FULLTEXT 135
#define TK_FUNCTION 136
#define TK_INTERVAL 137
#define TK_TOPIC 138
#define TK_AS 139
#define TK_DESC 140
#define TK_DESCRIBE 141
#define TK_RESET 142
#define TK_QUERY 143
#define TK_EXPLAIN 144
#define TK_ANALYZE 145
#define TK_VERBOSE 146
#define TK_NK_BOOL 147
#define TK_RATIO 148
#define TK_COMPACT 149
#define TK_VNODES 150
#define TK_IN 151
#define TK_OUTPUTTYPE 152
#define TK_AGGREGATE 153
#define TK_BUFSIZE 154
#define TK_STREAM 155
#define TK_INTO 156
#define TK_TRIGGER 157
#define TK_AT_ONCE 158
#define TK_WINDOW_CLOSE 159
#define TK_WATERMARK 160
#define TK_KILL 161
#define TK_CONNECTION 162
#define TK_MERGE 163
#define TK_VGROUP 164
#define TK_REDISTRIBUTE 165
#define TK_SPLIT 166
#define TK_SYNCDB 167
#define TK_NULL 168
#define TK_NK_QUESTION 169
#define TK_NK_ARROW 170
#define TK_ROWTS 171
#define TK_TBNAME 172
#define TK_QSTARTTS 173
#define TK_QENDTS 174
#define TK_WSTARTTS 175
#define TK_WENDTS 176
#define TK_WDURATION 177
#define TK_CAST 178
#define TK_NOW 179
#define TK_TODAY 180
#define TK_TIMEZONE 181
#define TK_COUNT 182
#define TK_FIRST 183
#define TK_LAST 184
#define TK_LAST_ROW 185
#define TK_BETWEEN 186
#define TK_IS 187
#define TK_NK_LT 188
#define TK_NK_GT 189
#define TK_NK_LE 190
#define TK_NK_GE 191
#define TK_NK_NE 192
#define TK_MATCH 193
#define TK_NMATCH 194
#define TK_CONTAINS 195
#define TK_JOIN 196
#define TK_INNER 197
#define TK_SELECT 198
#define TK_DISTINCT 199
#define TK_WHERE 200
#define TK_PARTITION 201
#define TK_BY 202
#define TK_SESSION 203
#define TK_STATE_WINDOW 204
#define TK_SLIDING 205
#define TK_FILL 206
#define TK_VALUE 207
#define TK_NONE 208
#define TK_PREV 209
#define TK_LINEAR 210
#define TK_NEXT 211
#define TK_GROUP 212
#define TK_HAVING 213
#define TK_ORDER 214
#define TK_SLIMIT 215
#define TK_SOFFSET 216
#define TK_LIMIT 217
#define TK_OFFSET 218
#define TK_ASC 219
#define TK_NULLS 220
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301
...
...
include/libs/nodes/cmdnodes.h
浏览文件 @
2ae4f5fa
...
...
@@ -47,6 +47,7 @@ typedef struct SDatabaseOptions {
SValueNode
*
pNumOfVgroups
;
SValueNode
*
pSingleStable
;
SValueNode
*
pStreamMode
;
SValueNode
*
pStrict
;
SNodeList
*
pRetentions
;
}
SDatabaseOptions
;
...
...
include/util/tdef.h
浏览文件 @
2ae4f5fa
...
...
@@ -359,8 +359,8 @@ typedef enum ELogicConditionType {
#define TSDB_MIN_DB_REPLICA 1
#define TSDB_MAX_DB_REPLICA 3
#define TSDB_DEFAULT_DB_REPLICA 1
#define TSDB_
MIN_DB_STRICT
0
#define TSDB_
MAX_DB_STRICT
1
#define TSDB_
DB_STRICT_OFF
0
#define TSDB_
DB_STRICT_ON
1
#define TSDB_DEFAULT_DB_STRICT 0
#define TSDB_MIN_DB_UPDATE 0
#define TSDB_MAX_DB_UPDATE 2
...
...
@@ -368,11 +368,11 @@ typedef enum ELogicConditionType {
#define TSDB_MIN_DB_CACHE_LAST_ROW 0
#define TSDB_MAX_DB_CACHE_LAST_ROW 3
#define TSDB_DEFAULT_CACHE_LAST_ROW 0
#define TSDB_
MIN_DB_STREAM_MODE
0
#define TSDB_
MAX_DB_STREAM_MODE
1
#define TSDB_
DB_STREAM_MODE_OFF
0
#define TSDB_
DB_STREAM_MODE_ON
1
#define TSDB_DEFAULT_DB_STREAM_MODE 0
#define TSDB_
MIN_DB_SINGLE_STABLE
0
#define TSDB_
MAX_DB_SINGLE_STABLE
1
#define TSDB_
DB_SINGLE_STABLE_ON
0
#define TSDB_
DB_SINGLE_STABLE_OFF
1
#define TSDB_DEFAULT_DB_SINGLE_STABLE 0
#define TSDB_MIN_DB_FILE_FACTOR 0
...
...
source/common/src/tmsg.c
浏览文件 @
2ae4f5fa
...
...
@@ -611,6 +611,8 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfTags
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfSmas
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
commentLen
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
ast1Len
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
ast2Len
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfColumns
;
++
i
)
{
SField
*
pField
=
taosArrayGet
(
pReq
->
pColumns
,
i
);
...
...
@@ -636,6 +638,12 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if
(
pReq
->
commentLen
>
0
)
{
if
(
tEncodeBinary
(
&
encoder
,
pReq
->
comment
,
pReq
->
commentLen
)
<
0
)
return
-
1
;
}
if
(
pReq
->
ast1Len
>
0
)
{
if
(
tEncodeBinary
(
&
encoder
,
pReq
->
pAst1
,
pReq
->
ast1Len
)
<
0
)
return
-
1
;
}
if
(
pReq
->
ast2Len
>
0
)
{
if
(
tEncodeBinary
(
&
encoder
,
pReq
->
pAst2
,
pReq
->
ast2Len
)
<
0
)
return
-
1
;
}
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
...
...
@@ -658,6 +666,8 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfTags
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfSmas
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
commentLen
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
ast1Len
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
ast2Len
)
<
0
)
return
-
1
;
pReq
->
pColumns
=
taosArrayInit
(
pReq
->
numOfColumns
,
sizeof
(
SField
));
pReq
->
pTags
=
taosArrayInit
(
pReq
->
numOfTags
,
sizeof
(
SField
));
...
...
@@ -706,6 +716,18 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
comment
)
<
0
)
return
-
1
;
}
if
(
pReq
->
ast1Len
>
0
)
{
pReq
->
pAst1
=
taosMemoryMalloc
(
pReq
->
ast1Len
);
if
(
pReq
->
pAst1
==
NULL
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
pAst1
)
<
0
)
return
-
1
;
}
if
(
pReq
->
ast2Len
>
0
)
{
pReq
->
pAst2
=
taosMemoryMalloc
(
pReq
->
ast2Len
);
if
(
pReq
->
pAst2
==
NULL
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
pAst2
)
<
0
)
return
-
1
;
}
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
...
...
@@ -717,6 +739,8 @@ void tFreeSMCreateStbReq(SMCreateStbReq *pReq) {
taosArrayDestroy
(
pReq
->
pTags
);
taosArrayDestroy
(
pReq
->
pSmas
);
taosMemoryFreeClear
(
pReq
->
comment
);
taosMemoryFreeClear
(
pReq
->
pAst1
);
taosMemoryFreeClear
(
pReq
->
pAst2
);
pReq
->
pColumns
=
NULL
;
pReq
->
pTags
=
NULL
;
pReq
->
pSmas
=
NULL
;
...
...
@@ -2211,6 +2235,14 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
update
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
cacheLastRow
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
streamMode
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
numOfRetensions
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pRsp
->
numOfRetensions
;
++
i
)
{
SRetention
*
pRetension
=
taosArrayGet
(
pRsp
->
pRetensions
,
i
);
if
(
tEncodeI32
(
&
encoder
,
pRetension
->
freq
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRetension
->
keep
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRetension
->
freqUnit
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRetension
->
keepUnit
)
<
0
)
return
-
1
;
}
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
...
...
@@ -2242,7 +2274,24 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
update
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
cacheLastRow
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
streamMode
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
numOfRetensions
)
<
0
)
return
-
1
;
pRsp
->
pRetensions
=
taosArrayInit
(
pRsp
->
numOfRetensions
,
sizeof
(
SRetention
));
if
(
pRsp
->
pRetensions
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
pRsp
->
numOfRetensions
;
++
i
)
{
SRetention
rentension
=
{
0
};
if
(
tDecodeI32
(
&
decoder
,
&
rentension
.
freq
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
rentension
.
keep
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
rentension
.
freqUnit
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
rentension
.
keepUnit
)
<
0
)
return
-
1
;
if
(
taosArrayPush
(
pRsp
->
pRetensions
,
&
rentension
)
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
}
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
2ae4f5fa
...
...
@@ -355,10 +355,14 @@ typedef struct {
int32_t
numOfTags
;
int32_t
numOfSmas
;
int32_t
commentLen
;
int32_t
ast1Len
;
int32_t
ast2Len
;
SSchema
*
pColumns
;
SSchema
*
pTags
;
SSchema
*
pSmas
;
char
*
comment
;
char
*
pAst1
;
char
*
pAst2
;
SRWLatch
lock
;
}
SStbObj
;
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
2ae4f5fa
...
...
@@ -286,12 +286,12 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if
(
pCfg
->
compression
<
TSDB_MIN_COMP_LEVEL
||
pCfg
->
compression
>
TSDB_MAX_COMP_LEVEL
)
return
-
1
;
if
(
pCfg
->
replications
<
TSDB_MIN_DB_REPLICA
||
pCfg
->
replications
>
TSDB_MAX_DB_REPLICA
)
return
-
1
;
if
(
pCfg
->
replications
>
mndGetDnodeSize
(
pMnode
))
return
-
1
;
if
(
pCfg
->
strict
<
TSDB_
MIN_DB_STRICT
||
pCfg
->
strict
>
TSDB_MAX_DB_STRICT
)
return
-
1
;
if
(
pCfg
->
strict
<
TSDB_
DB_STRICT_OFF
||
pCfg
->
strict
>
TSDB_DB_STRICT_ON
)
return
-
1
;
if
(
pCfg
->
strict
>
pCfg
->
replications
)
return
-
1
;
if
(
pCfg
->
update
<
TSDB_MIN_DB_UPDATE
||
pCfg
->
update
>
TSDB_MAX_DB_UPDATE
)
return
-
1
;
if
(
pCfg
->
cacheLastRow
<
TSDB_MIN_DB_CACHE_LAST_ROW
||
pCfg
->
cacheLastRow
>
TSDB_MAX_DB_CACHE_LAST_ROW
)
return
-
1
;
if
(
pCfg
->
streamMode
<
TSDB_
MIN_DB_STREAM_MODE
||
pCfg
->
streamMode
>
TSDB_MAX_DB_STREAM_MODE
)
return
-
1
;
if
(
pCfg
->
singleSTable
<
TSDB_
MIN_DB_SINGLE_STABLE
||
pCfg
->
streamMode
>
TSDB_MAX_DB_SINGLE_STABLE
)
return
-
1
;
if
(
pCfg
->
streamMode
<
TSDB_
DB_STREAM_MODE_OFF
||
pCfg
->
streamMode
>
TSDB_DB_STREAM_MODE_ON
)
return
-
1
;
if
(
pCfg
->
singleSTable
<
TSDB_
DB_SINGLE_STABLE_ON
||
pCfg
->
streamMode
>
TSDB_DB_SINGLE_STABLE_OFF
)
return
-
1
;
if
(
pCfg
->
hashMethod
!=
1
)
return
-
1
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -860,6 +860,8 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
pReq
->
pRsp
=
pRsp
;
pReq
->
rspLen
=
contLen
;
code
=
0
;
GET_DB_CFG_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
2ae4f5fa
...
...
@@ -72,7 +72,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
size
=
sizeof
(
SStbObj
)
+
(
pStb
->
numOfColumns
+
pStb
->
numOfTags
+
pStb
->
numOfSmas
)
*
sizeof
(
SSchema
)
+
TSDB_STB_RESERVE_SIZE
;
+
pStb
->
commentLen
+
pStb
->
ast1Len
+
pStb
->
ast2Len
+
TSDB_STB_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_STB
,
TSDB_STB_VER_NUMBER
,
size
);
if
(
pRaw
==
NULL
)
goto
_OVER
;
...
...
@@ -93,6 +93,8 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
numOfTags
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
numOfSmas
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
commentLen
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
ast1Len
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
ast2Len
,
_OVER
)
for
(
int32_t
i
=
0
;
i
<
pStb
->
numOfColumns
;
++
i
)
{
SSchema
*
pSchema
=
&
pStb
->
pColumns
[
i
];
...
...
@@ -121,6 +123,12 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
if
(
pStb
->
commentLen
>
0
)
{
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pStb
->
comment
,
pStb
->
commentLen
,
_OVER
)
}
if
(
pStb
->
ast1Len
>
0
)
{
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pStb
->
pAst1
,
pStb
->
ast1Len
,
_OVER
)
}
if
(
pStb
->
ast2Len
>
0
)
{
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pStb
->
pAst2
,
pStb
->
ast2Len
,
_OVER
)
}
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_STB_RESERVE_SIZE
,
_OVER
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
_OVER
)
...
...
@@ -173,6 +181,8 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
numOfTags
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
numOfSmas
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
commentLen
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
ast1Len
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
ast2Len
,
_OVER
)
pStb
->
pColumns
=
taosMemoryCalloc
(
pStb
->
numOfColumns
,
sizeof
(
SSchema
));
pStb
->
pTags
=
taosMemoryCalloc
(
pStb
->
numOfTags
,
sizeof
(
SSchema
));
...
...
@@ -210,6 +220,16 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
if
(
pStb
->
comment
==
NULL
)
goto
_OVER
;
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pStb
->
comment
,
pStb
->
commentLen
,
_OVER
)
}
if
(
pStb
->
ast1Len
>
0
)
{
pStb
->
pAst1
=
taosMemoryCalloc
(
pStb
->
ast1Len
,
1
);
if
(
pStb
->
pAst1
==
NULL
)
goto
_OVER
;
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pStb
->
pAst1
,
pStb
->
ast1Len
,
_OVER
)
}
if
(
pStb
->
ast2Len
>
0
)
{
pStb
->
pAst2
=
taosMemoryCalloc
(
pStb
->
ast2Len
,
1
);
if
(
pStb
->
pAst2
==
NULL
)
goto
_OVER
;
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pStb
->
pAst2
,
pStb
->
ast2Len
,
_OVER
)
}
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
TSDB_STB_RESERVE_SIZE
,
_OVER
)
terrno
=
0
;
...
...
@@ -238,6 +258,8 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
taosMemoryFreeClear
(
pStb
->
pColumns
);
taosMemoryFreeClear
(
pStb
->
pTags
);
taosMemoryFreeClear
(
pStb
->
comment
);
taosMemoryFreeClear
(
pStb
->
pAst1
);
taosMemoryFreeClear
(
pStb
->
pAst2
);
return
0
;
}
...
...
@@ -294,6 +316,30 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
}
}
if
(
pOld
->
ast1Len
<
pNew
->
ast1Len
)
{
void
*
pAst1
=
taosMemoryMalloc
(
pNew
->
ast1Len
);
if
(
pAst1
!=
NULL
)
{
taosMemoryFree
(
pOld
->
pAst1
);
pOld
->
pAst1
=
pAst1
;
}
else
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
mTrace
(
"stb:%s, failed to perform update action since %s"
,
pOld
->
name
,
terrstr
());
taosWUnLockLatch
(
&
pOld
->
lock
);
}
}
if
(
pOld
->
ast2Len
<
pNew
->
ast2Len
)
{
void
*
pAst2
=
taosMemoryMalloc
(
pNew
->
ast2Len
);
if
(
pAst2
!=
NULL
)
{
taosMemoryFree
(
pOld
->
pAst2
);
pOld
->
pAst2
=
pAst2
;
}
else
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
mTrace
(
"stb:%s, failed to perform update action since %s"
,
pOld
->
name
,
terrstr
());
taosWUnLockLatch
(
&
pOld
->
lock
);
}
}
pOld
->
updateTime
=
pNew
->
updateTime
;
pOld
->
version
=
pNew
->
version
;
pOld
->
nextColId
=
pNew
->
nextColId
;
...
...
@@ -304,6 +350,12 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
if
(
pNew
->
commentLen
!=
0
)
{
memcpy
(
pOld
->
comment
,
pNew
->
comment
,
TSDB_STB_COMMENT_LEN
);
}
if
(
pNew
->
ast1Len
!=
0
)
{
memcpy
(
pOld
->
pAst1
,
pNew
->
pAst1
,
pNew
->
ast1Len
);
}
if
(
pNew
->
ast2Len
!=
0
)
{
memcpy
(
pOld
->
pAst2
,
pNew
->
pAst2
,
pNew
->
ast2Len
);
}
taosWUnLockLatch
(
&
pOld
->
lock
);
return
0
;
}
...
...
@@ -646,6 +698,26 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
memcpy
(
stbObj
.
comment
,
pCreate
->
comment
,
stbObj
.
commentLen
);
}
stbObj
.
ast1Len
=
pCreate
->
ast1Len
;
if
(
stbObj
.
ast1Len
>
0
)
{
stbObj
.
pAst1
=
taosMemoryCalloc
(
stbObj
.
ast1Len
,
1
);
if
(
stbObj
.
pAst1
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
memcpy
(
stbObj
.
pAst1
,
pCreate
->
pAst1
,
stbObj
.
ast1Len
);
}
stbObj
.
ast2Len
=
pCreate
->
ast2Len
;
if
(
stbObj
.
ast2Len
>
0
)
{
stbObj
.
pAst2
=
taosMemoryCalloc
(
stbObj
.
ast2Len
,
1
);
if
(
stbObj
.
pAst2
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
memcpy
(
stbObj
.
pAst2
,
pCreate
->
pAst2
,
stbObj
.
ast2Len
);
}
stbObj
.
pColumns
=
taosMemoryMalloc
(
stbObj
.
numOfColumns
*
sizeof
(
SSchema
));
stbObj
.
pTags
=
taosMemoryMalloc
(
stbObj
.
numOfTags
*
sizeof
(
SSchema
));
stbObj
.
pSmas
=
taosMemoryMalloc
(
stbObj
.
numOfSmas
*
sizeof
(
SSchema
));
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
2ae4f5fa
...
...
@@ -1410,30 +1410,33 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
if
(
!
isAllRowsNull
(
src
)
&&
pColInfo
->
info
.
colId
==
src
->
colId
)
{
if
(
!
IS_VAR_DATA_TYPE
(
pColInfo
->
info
.
type
))
{
// todo opt performance
// memmove(pData, (char*)src->pData + bytes * start, bytes * num);
for
(
int32_t
k
=
start
;
k
<
num
+
start
;
++
k
)
{
int32_t
rowIndex
=
numOfRows
;
for
(
int32_t
k
=
start
;
k
<=
end
;
++
k
,
++
rowIndex
)
{
SCellVal
sVal
=
{
0
};
if
(
tdGetColDataOfRow
(
&
sVal
,
src
,
k
,
pCols
->
bitmapMode
)
<
0
)
{
TASSERT
(
0
);
}
if
(
sVal
.
valType
==
TD_VTYPE_NULL
)
{
colDataAppendNULL
(
pColInfo
,
k
);
colDataAppendNULL
(
pColInfo
,
rowIndex
);
}
else
{
colDataAppend
(
pColInfo
,
k
,
sVal
.
val
,
false
);
colDataAppend
(
pColInfo
,
rowIndex
,
sVal
.
val
,
false
);
}
}
}
else
{
// handle the var-string
int32_t
rowIndex
=
numOfRows
;
// todo refactor, only copy one-by-one
for
(
int32_t
k
=
start
;
k
<
num
+
start
;
++
k
)
{
for
(
int32_t
k
=
start
;
k
<
num
+
start
;
++
k
,
++
rowIndex
)
{
SCellVal
sVal
=
{
0
};
if
(
tdGetColDataOfRow
(
&
sVal
,
src
,
k
,
pCols
->
bitmapMode
)
<
0
)
{
TASSERT
(
0
);
}
if
(
sVal
.
valType
==
TD_VTYPE_NULL
)
{
colDataAppendNULL
(
pColInfo
,
k
);
colDataAppendNULL
(
pColInfo
,
rowIndex
);
}
else
{
colDataAppend
(
pColInfo
,
k
,
sVal
.
val
,
false
);
colDataAppend
(
pColInfo
,
rowIndex
,
sVal
.
val
,
false
);
}
}
}
...
...
@@ -1441,8 +1444,9 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
j
++
;
i
++
;
}
else
{
// pColInfo->info.colId < src->colId, it is a NULL data
for
(
int32_t
k
=
start
;
k
<
num
+
start
;
++
k
)
{
// TODO opt performance
colDataAppend
(
pColInfo
,
k
,
NULL
,
true
);
int32_t
rowIndex
=
numOfRows
;
for
(
int32_t
k
=
start
;
k
<
num
+
start
;
++
k
,
++
rowIndex
)
{
// TODO opt performance
colDataAppend
(
pColInfo
,
rowIndex
,
NULL
,
true
);
}
i
++
;
}
...
...
@@ -1450,8 +1454,10 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
while
(
i
<
requiredNumOfCols
)
{
// the remain columns are all null data
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pTsdbReadHandle
->
pColumns
,
i
);
for
(
int32_t
k
=
start
;
k
<
num
+
start
;
++
k
)
{
colDataAppend
(
pColInfo
,
k
,
NULL
,
true
);
// TODO add a fast version to set a number of consecutive NULL value.
int32_t
rowIndex
=
numOfRows
;
for
(
int32_t
k
=
start
;
k
<
num
+
start
;
++
k
,
++
rowIndex
)
{
colDataAppend
(
pColInfo
,
rowIndex
,
NULL
,
true
);
// TODO add a fast version to set a number of consecutive NULL value.
}
i
++
;
}
...
...
@@ -1749,7 +1755,7 @@ int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* p
// be included in the query time window will be discarded
static
void
doMergeTwoLevelData
(
STsdbReadHandle
*
pTsdbReadHandle
,
STableCheckInfo
*
pCheckInfo
,
SBlock
*
pBlock
)
{
SQueryFilePos
*
cur
=
&
pTsdbReadHandle
->
cur
;
SDataBlockInfo
blockInfo
=
{
0
};
//
GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
SDataBlockInfo
blockInfo
=
GET_FILE_DATA_BLOCK_INFO
(
pCheckInfo
,
pBlock
);
STsdbCfg
*
pCfg
=
&
pTsdbReadHandle
->
pTsdb
->
config
;
initTableMemIterator
(
pTsdbReadHandle
,
pCheckInfo
);
...
...
@@ -1771,9 +1777,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
STable
*
pTable
=
NULL
;
int32_t
endPos
=
getEndPosInDataBlock
(
pTsdbReadHandle
,
&
blockInfo
);
tsdbDebug
(
"%p uid:%"
PRIu64
" start merge data block, file block range:%"
PRIu64
"-%"
PRIu64
" rows:%d, start:%d,"
"end:%d, %s"
,
tsdbDebug
(
"%p uid:%"
PRIu64
" start merge data block, file block range:%"
PRIu64
"-%"
PRIu64
" rows:%d, start:%d, end:%d, %s"
,
pTsdbReadHandle
,
pCheckInfo
->
tableId
,
blockInfo
.
window
.
skey
,
blockInfo
.
window
.
ekey
,
blockInfo
.
rows
,
cur
->
pos
,
endPos
,
pTsdbReadHandle
->
idStr
);
...
...
source/libs/parser/inc/parAst.h
浏览文件 @
2ae4f5fa
...
...
@@ -53,6 +53,7 @@ typedef enum EDatabaseOptionType {
DB_OPTION_VGROUPS
,
DB_OPTION_SINGLE_STABLE
,
DB_OPTION_STREAM_MODE
,
DB_OPTION_STRICT
,
DB_OPTION_RETENTIONS
}
EDatabaseOptionType
;
...
...
source/libs/parser/inc/sql.y
浏览文件 @
2ae4f5fa
...
...
@@ -161,6 +161,7 @@ db_options(A) ::= db_options(B) VGROUPS NK_INTEGER(C).
db_options(A) ::= db_options(B) SINGLE_STABLE NK_INTEGER(C). { ((SDatabaseOptions*)B)->pSingleStable = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &C); A = B; }
db_options(A) ::= db_options(B) STREAM_MODE NK_INTEGER(C). { ((SDatabaseOptions*)B)->pStreamMode = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &C); A = B; }
db_options(A) ::= db_options(B) RETENTIONS retention_list(C). { ((SDatabaseOptions*)B)->pRetentions = C; A = B; }
db_options(A) ::= db_options(B) STRICT NK_INTEGER(C). { ((SDatabaseOptions*)B)->pStrict = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &C); A = B; }
alter_db_options(A) ::= alter_db_option(B). { A = createDatabaseOptions(pCxt); A = setDatabaseAlterOption(pCxt, A, &B); }
alter_db_options(A) ::= alter_db_options(B) alter_db_option(C). { A = setDatabaseAlterOption(pCxt, B, &C); }
...
...
@@ -175,6 +176,7 @@ alter_db_option(A) ::= WAL NK_INTEGER(B).
alter_db_option(A) ::= QUORUM NK_INTEGER(B). { A.type = DB_OPTION_QUORUM; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }
alter_db_option(A) ::= CACHELAST NK_INTEGER(B). { A.type = DB_OPTION_CACHELAST; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }
alter_db_option(A) ::= REPLICA NK_INTEGER(B). { A.type = DB_OPTION_REPLICA; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }
alter_db_option(A) ::= STRICT NK_INTEGER(B). { A.type = DB_OPTION_STRICT; A.pVal = (SValueNode*)createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); }
%type integer_list { SNodeList* }
%destructor integer_list { nodesDestroyList($$); }
...
...
@@ -359,7 +361,7 @@ from_db_opt(A) ::= FROM db_name(B).
%type func_name_list { SNodeList* }
%destructor func_name_list { nodesDestroyList($$); }
func_name_list(A) ::= func_name(B). { A = createNodeList(pCxt, B); }
func_name_list(A) ::= func_name_list(B) NK_COMMA
col_name(C).
{ A = addNodeToList(pCxt, B, C); }
func_name_list(A) ::= func_name_list(B) NK_COMMA
func_name(C).
{ A = addNodeToList(pCxt, B, C); }
func_name(A) ::= function_name(B). { A = createFunctionNode(pCxt, &B, NULL); }
...
...
source/libs/parser/src/parAstCreater.c
浏览文件 @
2ae4f5fa
...
...
@@ -677,9 +677,6 @@ SNode* setDatabaseAlterOption(SAstCreateContext* pCxt, SNode* pOptions, SAlterOp
case
DB_OPTION_PRECISION
:
((
SDatabaseOptions
*
)
pOptions
)
->
pPrecision
=
pAlterOption
->
pVal
;
break
;
case
DB_OPTION_QUORUM
:
((
SDatabaseOptions
*
)
pOptions
)
->
pQuorum
=
pAlterOption
->
pVal
;
break
;
case
DB_OPTION_REPLICA
:
((
SDatabaseOptions
*
)
pOptions
)
->
pReplica
=
pAlterOption
->
pVal
;
break
;
...
...
source/libs/parser/src/parTokenizer.c
浏览文件 @
2ae4f5fa
...
...
@@ -168,6 +168,7 @@ static SKeyword keywordTable[] = {
{
"STREAM"
,
TK_STREAM
},
{
"STREAMS"
,
TK_STREAMS
},
{
"STREAM_MODE"
,
TK_STREAM_MODE
},
{
"STRICT"
,
TK_STRICT
},
{
"SYNCDB"
,
TK_SYNCDB
},
{
"TABLE"
,
TK_TABLE
},
{
"TABLES"
,
TK_TABLES
},
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
2ae4f5fa
此差异已折叠。
点击以展开。
source/libs/parser/src/sql.c
浏览文件 @
2ae4f5fa
此差异已折叠。
点击以展开。
source/libs/scalar/src/sclfunc.c
浏览文件 @
2ae4f5fa
...
...
@@ -304,7 +304,7 @@ static int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
continue
;
}
char
*
in
=
pInputData
->
pData
+
pInputData
->
varmeta
.
offset
[
i
]
;
char
*
in
=
colDataGetData
(
pInputData
,
i
)
;
out
[
i
]
=
lenFn
(
in
,
type
);
}
...
...
@@ -395,11 +395,8 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int16_t
dataLen
=
0
;
for
(
int32_t
i
=
0
;
i
<
inputNum
;
++
i
)
{
if
(
pInput
[
i
].
numOfRows
==
1
)
{
input
[
i
]
=
pInputData
[
i
]
->
pData
+
pInputData
[
i
]
->
varmeta
.
offset
[
0
];
}
else
{
input
[
i
]
=
pInputData
[
i
]
->
pData
+
pInputData
[
i
]
->
varmeta
.
offset
[
k
];
}
int32_t
rowIdx
=
(
pInput
[
i
].
numOfRows
==
1
)
?
0
:
k
;
input
[
i
]
=
colDataGetData
(
pInputData
[
i
],
rowIdx
);
ret
=
concatCopyHelper
(
input
[
i
],
output
,
hasNchar
,
GET_PARAM_TYPE
(
&
pInput
[
i
]),
&
dataLen
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -473,11 +470,8 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
break
;
}
if
(
pInput
[
i
].
numOfRows
==
1
)
{
input
[
i
]
=
pInputData
[
i
]
->
pData
+
pInputData
[
i
]
->
varmeta
.
offset
[
0
];
}
else
{
input
[
i
]
=
pInputData
[
i
]
->
pData
+
pInputData
[
i
]
->
varmeta
.
offset
[
k
];
}
int32_t
rowIdx
=
(
pInput
[
i
].
numOfRows
==
1
)
?
0
:
k
;
input
[
i
]
=
colDataGetData
(
pInputData
[
i
],
rowIdx
);
ret
=
concatCopyHelper
(
input
[
i
],
output
,
hasNchar
,
GET_PARAM_TYPE
(
&
pInput
[
i
]),
&
dataLen
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -534,7 +528,7 @@ static int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScala
continue
;
}
char
*
input
=
pInputData
->
pData
+
pInputData
->
varmeta
.
offset
[
i
]
;
char
*
input
=
colDataGetData
(
pInput
[
0
].
columnData
,
i
)
;
int32_t
len
=
varDataLen
(
input
);
if
(
type
==
TSDB_DATA_TYPE_VARCHAR
)
{
for
(
int32_t
j
=
0
;
j
<
len
;
++
j
)
{
...
...
@@ -575,8 +569,8 @@ static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarPar
colDataAppendNULL
(
pOutputData
,
i
);
continue
;
}
char
*
input
=
pInputData
->
pData
+
pInputData
->
varmeta
.
offset
[
i
];
char
*
input
=
colDataGetData
(
pInput
[
0
].
columnData
,
i
);
int32_t
len
=
varDataLen
(
input
);
int32_t
charLen
=
(
type
==
TSDB_DATA_TYPE_VARCHAR
)
?
len
:
len
/
TSDB_NCHAR_SIZE
;
trimFn
(
input
,
output
,
type
,
charLen
);
...
...
@@ -615,19 +609,16 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
SColumnInfoData
*
pInputData
=
pInput
->
columnData
;
SColumnInfoData
*
pOutputData
=
pOutput
->
columnData
;
char
*
input
=
pInputData
->
pData
+
pInputData
->
varmeta
.
offset
[
0
];
char
*
output
=
NULL
;
int32_t
outputLen
=
pInputData
->
varmeta
.
length
*
pInput
->
numOfRows
;
char
*
outputBuf
=
taosMemoryCalloc
(
outputLen
,
1
);
output
=
outputBuf
;
char
*
output
=
outputBuf
;
for
(
int32_t
i
=
0
;
i
<
pInput
->
numOfRows
;
++
i
)
{
if
(
colDataIsNull_s
(
pInputData
,
i
))
{
colDataAppendNULL
(
pOutputData
,
i
);
continue
;
}
char
*
input
=
colDataGetData
(
pInput
[
0
].
columnData
,
i
);
int32_t
len
=
varDataLen
(
input
);
int32_t
startPosBytes
;
...
...
@@ -646,7 +637,6 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
varDataSetLen
(
output
,
resLen
);
colDataAppend
(
pOutputData
,
i
,
output
,
false
);
input
+=
varDataTLen
(
input
);
output
+=
varDataTLen
(
output
);
}
...
...
@@ -799,13 +789,13 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
int32_t
toISO8601Function
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
)
{
int32_t
type
=
GET_PARAM_TYPE
(
pInput
);
char
*
input
=
pInput
[
0
].
columnData
->
pData
;
for
(
int32_t
i
=
0
;
i
<
pInput
[
0
].
numOfRows
;
++
i
)
{
if
(
colDataIsNull_s
(
pInput
[
0
].
columnData
,
i
))
{
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
char
*
input
=
colDataGetData
(
pInput
[
0
].
columnData
,
i
);
char
fraction
[
20
]
=
{
0
};
bool
hasFraction
=
false
;
NUM_TO_STRING
(
type
,
input
,
sizeof
(
fraction
),
fraction
);
...
...
@@ -822,7 +812,8 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
}
else
if
(
tsDigits
==
TSDB_TIME_PRECISION_NANO_DIGITS
)
{
timeVal
=
timeVal
/
(
1000
*
1000
*
1000
);
}
else
{
assert
(
0
);
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
hasFraction
=
true
;
memmove
(
fraction
,
fraction
+
TSDB_TIME_PRECISION_SEC_DIGITS
,
TSDB_TIME_PRECISION_SEC_DIGITS
);
...
...
@@ -852,7 +843,6 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
varDataSetLen
(
buf
,
len
);
colDataAppend
(
pOutput
->
columnData
,
i
,
buf
,
false
);
input
+=
tDataTypes
[
type
].
bytes
;
}
pOutput
->
numOfRows
=
pInput
->
numOfRows
;
...
...
@@ -864,12 +854,12 @@ int32_t toUnixtimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
int32_t
type
=
GET_PARAM_TYPE
(
pInput
);
int32_t
timePrec
=
GET_PARAM_PRECISON
(
pInput
);
char
*
input
=
pInput
[
0
].
columnData
->
pData
+
pInput
[
0
].
columnData
->
varmeta
.
offset
[
0
];
for
(
int32_t
i
=
0
;
i
<
pInput
[
0
].
numOfRows
;
++
i
)
{
if
(
colDataIsNull_s
(
pInput
[
0
].
columnData
,
i
))
{
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
char
*
input
=
colDataGetData
(
pInput
[
0
].
columnData
,
i
);
int64_t
timeVal
=
0
;
int32_t
ret
=
convertStringToTimestamp
(
type
,
input
,
timePrec
,
&
timeVal
);
...
...
@@ -879,7 +869,6 @@ int32_t toUnixtimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
}
colDataAppend
(
pOutput
->
columnData
,
i
,
(
char
*
)
&
timeVal
,
false
);
input
+=
varDataTLen
(
input
);
}
pOutput
->
numOfRows
=
pInput
->
numOfRows
;
...
...
@@ -897,19 +886,14 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
int64_t
factor
=
(
timePrec
==
TSDB_TIME_PRECISION_MILLI
)
?
1000
:
(
timePrec
==
TSDB_TIME_PRECISION_MICRO
?
1000000
:
1000000000
);
char
*
input
=
NULL
;
if
(
IS_VAR_DATA_TYPE
(
type
))
{
input
=
pInput
[
0
].
columnData
->
pData
+
pInput
[
0
].
columnData
->
varmeta
.
offset
[
0
];
}
else
{
input
=
pInput
[
0
].
columnData
->
pData
;
}
for
(
int32_t
i
=
0
;
i
<
pInput
[
0
].
numOfRows
;
++
i
)
{
if
(
colDataIsNull_s
(
pInput
[
0
].
columnData
,
i
))
{
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
char
*
input
=
colDataGetData
(
pInput
[
0
].
columnData
,
i
);
if
(
IS_VAR_DATA_TYPE
(
type
))
{
/* datetime format strings */
int32_t
ret
=
convertStringToTimestamp
(
type
,
input
,
TSDB_TIME_PRECISION_NANO
,
&
timeVal
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -959,7 +943,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
}
else
if
(
tsDigits
<=
TSDB_TIME_PRECISION_SEC_DIGITS
){
timeVal
=
timeVal
*
factor
;
}
else
{
assert
(
0
);
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
break
;
}
...
...
@@ -973,7 +958,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
}
else
if
(
tsDigits
<=
TSDB_TIME_PRECISION_SEC_DIGITS
)
{
timeVal
=
timeVal
*
factor
;
}
else
{
assert
(
0
);
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
break
;
}
...
...
@@ -987,7 +973,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
}
else
if
(
tsDigits
<=
TSDB_TIME_PRECISION_SEC_DIGITS
)
{
timeVal
=
timeVal
*
factor
/
factor
/
60
*
60
*
factor
;
}
else
{
assert
(
0
);
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
break
;
}
...
...
@@ -1001,7 +988,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
}
else
if
(
tsDigits
<=
TSDB_TIME_PRECISION_SEC_DIGITS
)
{
timeVal
=
timeVal
*
factor
/
factor
/
3600
*
3600
*
factor
;
}
else
{
assert
(
0
);
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
break
;
}
...
...
@@ -1015,7 +1003,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
}
else
if
(
tsDigits
<=
TSDB_TIME_PRECISION_SEC_DIGITS
)
{
timeVal
=
timeVal
*
factor
/
factor
/
86400
*
86400
*
factor
;
}
else
{
assert
(
0
);
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
break
;
}
...
...
@@ -1029,7 +1018,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
}
else
if
(
tsDigits
<=
TSDB_TIME_PRECISION_SEC_DIGITS
)
{
timeVal
=
timeVal
*
factor
/
factor
/
604800
*
604800
*
factor
;
}
else
{
assert
(
0
);
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
break
;
}
...
...
@@ -1068,11 +1058,6 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
}
colDataAppend
(
pOutput
->
columnData
,
i
,
(
char
*
)
&
timeVal
,
false
);
if
(
IS_VAR_DATA_TYPE
(
type
))
{
input
+=
varDataTLen
(
input
);
}
else
{
input
+=
tDataTypes
[
type
].
bytes
;
}
}
pOutput
->
numOfRows
=
pInput
->
numOfRows
;
...
...
@@ -1094,12 +1079,6 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
type
!=
TSDB_DATA_TYPE_BINARY
&&
type
!=
TSDB_DATA_TYPE_NCHAR
)
{
return
TSDB_CODE_FAILED
;
}
if
(
IS_VAR_DATA_TYPE
(
type
))
{
input
[
k
]
=
pInput
[
k
].
columnData
->
pData
+
pInput
[
k
].
columnData
->
varmeta
.
offset
[
0
];
}
else
{
input
[
k
]
=
pInput
[
k
].
columnData
->
pData
;
}
}
for
(
int32_t
i
=
0
;
i
<
pInput
[
0
].
numOfRows
;
++
i
)
{
...
...
@@ -1109,6 +1088,9 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
continue
;
}
int32_t
rowIdx
=
(
pInput
[
k
].
numOfRows
==
1
)
?
0
:
i
;
input
[
k
]
=
colDataGetData
(
pInput
[
k
].
columnData
,
rowIdx
);
int32_t
type
=
GET_PARAM_TYPE
(
&
pInput
[
k
]);
if
(
IS_VAR_DATA_TYPE
(
type
))
{
/* datetime format strings */
int32_t
ret
=
convertStringToTimestamp
(
type
,
input
[
k
],
TSDB_TIME_PRECISION_NANO
,
&
timeVal
[
k
]);
...
...
@@ -1138,14 +1120,9 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
timeVal
[
k
]
=
timeVal
[
k
]
*
1000
;
}
else
if
(
tsDigits
==
TSDB_TIME_PRECISION_NANO_DIGITS
)
{
timeVal
[
k
]
=
timeVal
[
k
];
}
}
if
(
pInput
[
k
].
numOfRows
!=
1
)
{
if
(
IS_VAR_DATA_TYPE
(
type
))
{
input
[
k
]
+=
varDataTLen
(
input
[
k
]);
}
else
{
input
[
k
]
+=
tDataTypes
[
type
].
bytes
;
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
}
}
...
...
tests/script/jenkins/basic.txt
浏览文件 @
2ae4f5fa
...
...
@@ -55,7 +55,7 @@
# ---- tmq
./test.sh -f tsim/tmq/basic.sim
./test.sh -f tsim/tmq/basic1.sim
#
./test.sh -f tsim/tmq/basic1.sim
#./test.sh -f tsim/tmq/oneTopic.sim
#./test.sh -f tsim/tmq/multiTopic.sim
...
...
tests/script/tsim/tmq/consume.sh
0 → 100755
浏览文件 @
2ae4f5fa
#!/bin/bash
##################################################
#
# Do tmq test
#
##################################################
set
+e
# set default value for parameters
EXEC_OPTON
=
start
DB_NAME
=
db
POLL_DELAY
=
5
VALGRIND
=
0
SIGNAL
=
SIGINT
while
getopts
"d:s:v:y:x:"
arg
do
case
$arg
in
d
)
DB_NAME
=
$OPTARG
;;
s
)
EXEC_OPTON
=
$OPTARG
;;
v
)
VALGRIND
=
1
;;
y
)
POLL_DELAY
=
$OPTARG
;;
x
)
SIGNAL
=
$OPTARG
;;
?
)
echo
"unkown argument"
;;
esac
done
SCRIPT_DIR
=
`
pwd
`
IN_TDINTERNAL
=
"community"
if
[[
"
$SCRIPT_DIR
"
==
*
"
$IN_TDINTERNAL
"
*
]]
;
then
cd
../../..
else
cd
../../
fi
TOP_DIR
=
`
pwd
`
if
[[
"
$SCRIPT_DIR
"
==
*
"
$IN_TDINTERNAL
"
*
]]
;
then
BIN_DIR
=
`
find
.
-name
"tmq_sim"
|grep bin|head
-n1
|cut
-d
'/'
-f
2,3
`
else
BIN_DIR
=
`
find
.
-name
"tmq_sim"
|grep bin|head
-n1
|cut
-d
'/'
-f
2
`
fi
declare
-x
BUILD_DIR
=
$TOP_DIR
/
$BIN_DIR
declare
-x
SIM_DIR
=
$TOP_DIR
/sim
PROGRAM
=
$BUILD_DIR
/build/bin/tmq_sim
PRG_DIR
=
$SIM_DIR
/tsim
CFG_DIR
=
$PRG_DIR
/cfg
LOG_DIR
=
$PRG_DIR
/log
echo
"------------------------------------------------------------------------"
echo
"BUILD_DIR:
$BUILD_DIR
"
echo
"SIM_DIR :
$SIM_DIR
"
echo
"CFG_DIR :
$CFG_DIR
"
echo
"PROGRAM:
$PROGRAM
echo "
CFG_DIR:
$CFG_DIR
echo
"POLL_DELAY:
$POLL_DELAY
echo "
DB_NAME:
$DB_NAME
echo
"------------------------------------------------------------------------"
if
[
"
$EXEC_OPTON
"
=
"start"
]
;
then
if
[
$VALGRIND
-eq
1
]
;
then
echo nohup
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tmq_sim.log
$PROGRAM
-c
$CFG_DIR
-d
$DB_NAME
-y
$POLL_DELAY
>
/dev/null 2>&1 &
nohup
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tmq_sim.log
$PROGRAM
-c
$CFG_DIR
-d
$DB_NAME
-y
$POLL_DELAY
>
/dev/null 2>&1 &
else
echo
"nohup
$PROGRAM
-c
$CFG_DIR
-d
$DB_NAME
-y
$POLL_DELAY
> /dev/null 2>&1 &"
nohup
$PROGRAM
-c
$CFG_DIR
-y
$POLL_DELAY
-d
$DB_NAME
>
/dev/null 2>&1 &
fi
else
PID
=
`
ps
-ef
|grep tmq_sim |
grep
-v
grep
|
awk
'{print $2}'
`
while
[
-n
"
$PID
"
]
do
if
[
"
$SIGNAL
"
=
"SIGKILL"
]
;
then
echo
try to
kill
by signal SIGKILL
kill
-9
$PID
else
echo
try to
kill
by signal SIGINT
kill
-SIGINT
$PID
fi
sleep
1
PID
=
`
ps
-ef
|grep tmq_sim |
grep
-v
grep
|
awk
'{print $2}'
`
done
fi
tests/test/c/tmqSim.c
浏览文件 @
2ae4f5fa
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录