Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8c2e9842
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
8c2e9842
编写于
6月 30, 2022
作者:
X
Xiaoyu Wang
提交者:
GitHub
6月 30, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14393 from taosdata/feature/3.0_debug_wxy
feat: support 'select *, expr from ...' syntax
上级
068924cc
42fba648
变更
20
展开全部
隐藏空白更改
内联
并排
Showing
20 changed file
with
2577 addition
and
2496 deletion
+2577
-2496
include/common/tmsg.h
include/common/tmsg.h
+1
-0
include/common/ttokendef.h
include/common/ttokendef.h
+75
-73
include/libs/nodes/cmdnodes.h
include/libs/nodes/cmdnodes.h
+1
-0
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+4
-6
include/libs/planner/planner.h
include/libs/planner/planner.h
+1
-0
source/common/src/tmsg.c
source/common/src/tmsg.c
+2
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-1
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+5
-6
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+20
-29
source/libs/parser/inc/sql.y
source/libs/parser/inc/sql.y
+4
-7
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+69
-1
source/libs/parser/src/parTokenizer.c
source/libs/parser/src/parTokenizer.c
+3
-1
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+45
-25
source/libs/parser/src/sql.c
source/libs/parser/src/sql.c
+2295
-2291
source/libs/parser/test/parInitialCTest.cpp
source/libs/parser/test/parInitialCTest.cpp
+21
-17
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+1
-4
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+1
-2
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+2
-3
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+21
-23
tools/shell/src/shellEngine.c
tools/shell/src/shellEngine.c
+5
-6
未找到文件。
include/common/tmsg.h
浏览文件 @
8c2e9842
...
...
@@ -1629,6 +1629,7 @@ typedef struct {
int8_t
triggerType
;
int64_t
maxDelay
;
int64_t
watermark
;
int8_t
igExpired
;
}
SCMCreateStreamReq
;
typedef
struct
{
...
...
include/common/ttokendef.h
浏览文件 @
8c2e9842
...
...
@@ -192,79 +192,81 @@
#define TK_TRIGGER 174
#define TK_AT_ONCE 175
#define TK_WINDOW_CLOSE 176
#define TK_KILL 177
#define TK_CONNECTION 178
#define TK_TRANSACTION 179
#define TK_BALANCE 180
#define TK_VGROUP 181
#define TK_MERGE 182
#define TK_REDISTRIBUTE 183
#define TK_SPLIT 184
#define TK_SYNCDB 185
#define TK_DELETE 186
#define TK_NULL 187
#define TK_NK_QUESTION 188
#define TK_NK_ARROW 189
#define TK_ROWTS 190
#define TK_TBNAME 191
#define TK_QSTARTTS 192
#define TK_QENDTS 193
#define TK_WSTARTTS 194
#define TK_WENDTS 195
#define TK_WDURATION 196
#define TK_CAST 197
#define TK_NOW 198
#define TK_TODAY 199
#define TK_TIMEZONE 200
#define TK_CLIENT_VERSION 201
#define TK_SERVER_VERSION 202
#define TK_SERVER_STATUS 203
#define TK_CURRENT_USER 204
#define TK_COUNT 205
#define TK_LAST_ROW 206
#define TK_BETWEEN 207
#define TK_IS 208
#define TK_NK_LT 209
#define TK_NK_GT 210
#define TK_NK_LE 211
#define TK_NK_GE 212
#define TK_NK_NE 213
#define TK_MATCH 214
#define TK_NMATCH 215
#define TK_CONTAINS 216
#define TK_JOIN 217
#define TK_INNER 218
#define TK_SELECT 219
#define TK_DISTINCT 220
#define TK_WHERE 221
#define TK_PARTITION 222
#define TK_BY 223
#define TK_SESSION 224
#define TK_STATE_WINDOW 225
#define TK_SLIDING 226
#define TK_FILL 227
#define TK_VALUE 228
#define TK_NONE 229
#define TK_PREV 230
#define TK_LINEAR 231
#define TK_NEXT 232
#define TK_HAVING 233
#define TK_RANGE 234
#define TK_EVERY 235
#define TK_ORDER 236
#define TK_SLIMIT 237
#define TK_SOFFSET 238
#define TK_LIMIT 239
#define TK_OFFSET 240
#define TK_ASC 241
#define TK_NULLS 242
#define TK_ID 243
#define TK_NK_BITNOT 244
#define TK_INSERT 245
#define TK_VALUES 246
#define TK_IMPORT 247
#define TK_NK_SEMI 248
#define TK_FILE 249
#define TK_IGNORE 177
#define TK_EXPIRED 178
#define TK_KILL 179
#define TK_CONNECTION 180
#define TK_TRANSACTION 181
#define TK_BALANCE 182
#define TK_VGROUP 183
#define TK_MERGE 184
#define TK_REDISTRIBUTE 185
#define TK_SPLIT 186
#define TK_SYNCDB 187
#define TK_DELETE 188
#define TK_NULL 189
#define TK_NK_QUESTION 190
#define TK_NK_ARROW 191
#define TK_ROWTS 192
#define TK_TBNAME 193
#define TK_QSTARTTS 194
#define TK_QENDTS 195
#define TK_WSTARTTS 196
#define TK_WENDTS 197
#define TK_WDURATION 198
#define TK_CAST 199
#define TK_NOW 200
#define TK_TODAY 201
#define TK_TIMEZONE 202
#define TK_CLIENT_VERSION 203
#define TK_SERVER_VERSION 204
#define TK_SERVER_STATUS 205
#define TK_CURRENT_USER 206
#define TK_COUNT 207
#define TK_LAST_ROW 208
#define TK_BETWEEN 209
#define TK_IS 210
#define TK_NK_LT 211
#define TK_NK_GT 212
#define TK_NK_LE 213
#define TK_NK_GE 214
#define TK_NK_NE 215
#define TK_MATCH 216
#define TK_NMATCH 217
#define TK_CONTAINS 218
#define TK_JOIN 219
#define TK_INNER 220
#define TK_SELECT 221
#define TK_DISTINCT 222
#define TK_WHERE 223
#define TK_PARTITION 224
#define TK_BY 225
#define TK_SESSION 226
#define TK_STATE_WINDOW 227
#define TK_SLIDING 228
#define TK_FILL 229
#define TK_VALUE 230
#define TK_NONE 231
#define TK_PREV 232
#define TK_LINEAR 233
#define TK_NEXT 234
#define TK_HAVING 235
#define TK_RANGE 236
#define TK_EVERY 237
#define TK_ORDER 238
#define TK_SLIMIT 239
#define TK_SOFFSET 240
#define TK_LIMIT 241
#define TK_OFFSET 242
#define TK_ASC 243
#define TK_NULLS 244
#define TK_ID 245
#define TK_NK_BITNOT 246
#define TK_INSERT 247
#define TK_VALUES 248
#define TK_IMPORT 249
#define TK_NK_SEMI 250
#define TK_FILE 251
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301
...
...
include/libs/nodes/cmdnodes.h
浏览文件 @
8c2e9842
...
...
@@ -338,6 +338,7 @@ typedef struct SStreamOptions {
int8_t
triggerType
;
SNode
*
pDelay
;
SNode
*
pWatermark
;
bool
ignoreExpired
;
}
SStreamOptions
;
typedef
struct
SCreateStreamStmt
{
...
...
include/libs/nodes/plannodes.h
浏览文件 @
8c2e9842
...
...
@@ -73,8 +73,7 @@ typedef struct SScanLogicNode {
SNode
*
pTagIndexCond
;
int8_t
triggerType
;
int64_t
watermark
;
int16_t
tsColId
;
double
filesFactor
;
int8_t
igExpired
;
SArray
*
pSmaIndexes
;
SNodeList
*
pGroupTags
;
bool
groupSort
;
...
...
@@ -175,7 +174,7 @@ typedef struct SWindowLogicNode {
SNode
*
pStateExpr
;
int8_t
triggerType
;
int64_t
watermark
;
double
filesFactor
;
int8_t
igExpired
;
EWindowAlgorithm
windowAlgo
;
}
SWindowLogicNode
;
...
...
@@ -296,8 +295,7 @@ typedef struct STableScanPhysiNode {
int8_t
slidingUnit
;
int8_t
triggerType
;
int64_t
watermark
;
int16_t
tsColId
;
double
filesFactor
;
int8_t
igExpired
;
}
STableScanPhysiNode
;
typedef
STableScanPhysiNode
STableSeqScanPhysiNode
;
...
...
@@ -374,7 +372,7 @@ typedef struct SWinodwPhysiNode {
SNode
*
pTsEnd
;
// window end timestamp
int8_t
triggerType
;
int64_t
watermark
;
double
filesFactor
;
int8_t
igExpired
;
}
SWinodwPhysiNode
;
typedef
struct
SIntervalPhysiNode
{
...
...
include/libs/planner/planner.h
浏览文件 @
8c2e9842
...
...
@@ -34,6 +34,7 @@ typedef struct SPlanContext {
bool
showRewrite
;
int8_t
triggerType
;
int64_t
watermark
;
int8_t
igExpired
;
char
*
pMsg
;
int32_t
msgLen
;
const
char
*
pUser
;
...
...
source/common/src/tmsg.c
浏览文件 @
8c2e9842
...
...
@@ -4643,6 +4643,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if
(
tEncodeI8
(
&
encoder
,
pReq
->
triggerType
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
maxDelay
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
watermark
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
igExpired
)
<
0
)
return
-
1
;
if
(
sqlLen
>
0
&&
tEncodeCStr
(
&
encoder
,
pReq
->
sql
)
<
0
)
return
-
1
;
if
(
astLen
>
0
&&
tEncodeCStr
(
&
encoder
,
pReq
->
ast
)
<
0
)
return
-
1
;
...
...
@@ -4670,6 +4671,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
triggerType
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
maxDelay
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
watermark
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
igExpired
)
<
0
)
return
-
1
;
if
(
sqlLen
>
0
)
{
pReq
->
sql
=
taosMemoryCalloc
(
1
,
sqlLen
+
1
);
...
...
@@ -5504,4 +5506,3 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
}
return
0
;
}
source/libs/executor/src/scanoperator.c
浏览文件 @
8c2e9842
...
...
@@ -1213,7 +1213,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
int16_t
colId
=
id
->
colId
;
taosArrayPush
(
pColIds
,
&
colId
);
if
(
id
->
colId
==
pTableScanNode
->
tsColId
)
{
if
(
id
->
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
pInfo
->
primaryTsIndex
=
id
->
targetSlotId
;
}
}
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
8c2e9842
...
...
@@ -350,9 +350,9 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
CLONE_NODE_FIELD
(
pTagIndexCond
);
COPY_SCALAR_FIELD
(
triggerType
);
COPY_SCALAR_FIELD
(
watermark
);
COPY_SCALAR_FIELD
(
tsColId
);
COPY_SCALAR_FIELD
(
filesFactor
);
COPY_SCALAR_FIELD
(
igExpired
);
CLONE_NODE_LIST_FIELD
(
pGroupTags
);
COPY_SCALAR_FIELD
(
groupSort
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -421,7 +421,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
CLONE_NODE_FIELD
(
pStateExpr
);
COPY_SCALAR_FIELD
(
triggerType
);
COPY_SCALAR_FIELD
(
watermark
);
COPY_SCALAR_FIELD
(
filesFactor
);
COPY_SCALAR_FIELD
(
igExpired
);
COPY_SCALAR_FIELD
(
windowAlgo
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -511,8 +511,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy
COPY_SCALAR_FIELD
(
slidingUnit
);
COPY_SCALAR_FIELD
(
triggerType
);
COPY_SCALAR_FIELD
(
watermark
);
COPY_SCALAR_FIELD
(
tsColId
);
COPY_SCALAR_FIELD
(
filesFactor
);
COPY_SCALAR_FIELD
(
igExpired
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -532,7 +531,7 @@ static int32_t physiWindowCopy(const SWinodwPhysiNode* pSrc, SWinodwPhysiNode* p
CLONE_NODE_FIELD
(
pTsEnd
);
COPY_SCALAR_FIELD
(
triggerType
);
COPY_SCALAR_FIELD
(
watermark
);
COPY_SCALAR_FIELD
(
filesFactor
);
COPY_SCALAR_FIELD
(
igExpired
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
8c2e9842
...
...
@@ -1426,12 +1426,11 @@ static const char* jkTableScanPhysiPlanDynamicScanFuncs = "DynamicScanFuncs";
static
const
char
*
jkTableScanPhysiPlanInterval
=
"Interval"
;
static
const
char
*
jkTableScanPhysiPlanOffset
=
"Offset"
;
static
const
char
*
jkTableScanPhysiPlanSliding
=
"Sliding"
;
static
const
char
*
jkTableScanPhysiPlanIntervalUnit
=
"intervalUnit"
;
static
const
char
*
jkTableScanPhysiPlanSlidingUnit
=
"slidingUnit"
;
static
const
char
*
jkTableScanPhysiPlanTriggerType
=
"triggerType"
;
static
const
char
*
jkTableScanPhysiPlanWatermark
=
"watermark"
;
static
const
char
*
jkTableScanPhysiPlanTsColId
=
"tsColId"
;
static
const
char
*
jkTableScanPhysiPlanFilesFactor
=
"FilesFactor"
;
static
const
char
*
jkTableScanPhysiPlanIntervalUnit
=
"IntervalUnit"
;
static
const
char
*
jkTableScanPhysiPlanSlidingUnit
=
"SlidingUnit"
;
static
const
char
*
jkTableScanPhysiPlanTriggerType
=
"TriggerType"
;
static
const
char
*
jkTableScanPhysiPlanWatermark
=
"Watermark"
;
static
const
char
*
jkTableScanPhysiPlanIgnoreExpired
=
"IgnoreExpired"
;
static
const
char
*
jkTableScanPhysiPlanGroupTags
=
"GroupTags"
;
static
const
char
*
jkTableScanPhysiPlanGroupSort
=
"GroupSort"
;
...
...
@@ -1482,10 +1481,7 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
code
=
tjsonAddIntegerToObject
(
pJson
,
jkTableScanPhysiPlanWatermark
,
pNode
->
watermark
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkTableScanPhysiPlanTsColId
,
pNode
->
tsColId
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddDoubleToObject
(
pJson
,
jkTableScanPhysiPlanFilesFactor
,
pNode
->
filesFactor
);
code
=
tjsonAddIntegerToObject
(
pJson
,
jkTableScanPhysiPlanIgnoreExpired
,
pNode
->
igExpired
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkTableScanPhysiPlanGroupTags
,
pNode
->
pGroupTags
);
...
...
@@ -1517,37 +1513,34 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
code
=
tjsonGetDoubleValue
(
pJson
,
jkTableScanPhysiPlanRatio
,
&
pNode
->
ratio
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkTableScanPhysiPlanDataRequired
,
pNode
->
dataRequired
,
code
);
code
=
tjsonGetIntValue
(
pJson
,
jkTableScanPhysiPlanDataRequired
,
&
pNode
->
dataRequired
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkTableScanPhysiPlanDynamicScanFuncs
,
&
pNode
->
pDynamicScanFuncs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkTableScanPhysiPlanInterval
,
pNode
->
interval
,
code
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkTableScanPhysiPlanOffset
,
pNode
->
offset
,
code
);
code
=
tjsonGetBigIntValue
(
pJson
,
jkTableScanPhysiPlanInterval
,
&
pNode
->
interval
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkTableScanPhysiPlanSliding
,
pNode
->
sliding
,
code
);
code
=
tjsonGetBigIntValue
(
pJson
,
jkTableScanPhysiPlanOffset
,
&
pNode
->
offset
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkTableScanPhysiPlanIntervalUnit
,
pNode
->
intervalUnit
,
code
);
code
=
tjsonGetBigIntValue
(
pJson
,
jkTableScanPhysiPlanSliding
,
&
pNode
->
sliding
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkTableScanPhysiPlanSlidingUnit
,
pNode
->
slidingUnit
,
code
);
code
=
tjsonGetTinyIntValue
(
pJson
,
jkTableScanPhysiPlanIntervalUnit
,
&
pNode
->
intervalUnit
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkTableScanPhysiPlanTriggerType
,
pNode
->
triggerType
,
code
);
code
=
tjsonGetTinyIntValue
(
pJson
,
jkTableScanPhysiPlanSlidingUnit
,
&
pNode
->
slidingUnit
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkTableScanPhysiPlanWatermark
,
pNode
->
watermark
,
cod
e
);
code
=
tjsonGetTinyIntValue
(
pJson
,
jkTableScanPhysiPlanTriggerType
,
&
pNode
->
triggerTyp
e
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkTableScanPhysiPlanTsColId
,
pNode
->
tsColId
,
code
);
code
=
tjsonGetBigIntValue
(
pJson
,
jkTableScanPhysiPlanWatermark
,
&
pNode
->
watermark
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGet
DoubleValue
(
pJson
,
jkTableScanPhysiPlanFilesFactor
,
&
pNode
->
filesFactor
);
code
=
tjsonGet
TinyIntValue
(
pJson
,
jkTableScanPhysiPlanIgnoreExpired
,
&
pNode
->
igExpired
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkTableScanPhysiPlanGroupTags
,
&
pNode
->
pGroupTags
);
...
...
@@ -1826,7 +1819,7 @@ static const char* jkWindowPhysiPlanTsPk = "TsPk";
static
const
char
*
jkWindowPhysiPlanTsEnd
=
"TsEnd"
;
static
const
char
*
jkWindowPhysiPlanTriggerType
=
"TriggerType"
;
static
const
char
*
jkWindowPhysiPlanWatermark
=
"Watermark"
;
static
const
char
*
jkWindowPhysiPlan
FilesFactor
=
"FilesFactor
"
;
static
const
char
*
jkWindowPhysiPlan
IgnoreExpired
=
"IgnoreExpired
"
;
static
int32_t
physiWindowNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SWinodwPhysiNode
*
pNode
=
(
const
SWinodwPhysiNode
*
)
pObj
;
...
...
@@ -1851,7 +1844,7 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
code
=
tjsonAddIntegerToObject
(
pJson
,
jkWindowPhysiPlanWatermark
,
pNode
->
watermark
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAdd
DoubleToObject
(
pJson
,
jkWindowPhysiPlanFilesFactor
,
pNode
->
filesFactor
);
code
=
tjsonAdd
IntegerToObject
(
pJson
,
jkWindowPhysiPlanIgnoreExpired
,
pNode
->
igExpired
);
}
return
code
;
...
...
@@ -1874,15 +1867,13 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
code
=
jsonToNodeObject
(
pJson
,
jkWindowPhysiPlanTsEnd
,
(
SNode
**
)
&
pNode
->
pTsEnd
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkWindowPhysiPlanTriggerType
,
pNode
->
triggerType
,
code
);
;
code
=
tjsonGetTinyIntValue
(
pJson
,
jkWindowPhysiPlanTriggerType
,
&
pNode
->
triggerType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkWindowPhysiPlanWatermark
,
pNode
->
watermark
,
code
);
;
code
=
tjsonGetBigIntValue
(
pJson
,
jkWindowPhysiPlanWatermark
,
&
pNode
->
watermark
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGet
DoubleValue
(
pJson
,
jkWindowPhysiPlanFilesFactor
,
&
pNode
->
filesFactor
);
code
=
tjsonGet
TinyIntValue
(
pJson
,
jkWindowPhysiPlanIgnoreExpired
,
&
pNode
->
igExpired
);
}
return
code
;
...
...
source/libs/parser/inc/sql.y
浏览文件 @
8c2e9842
...
...
@@ -488,6 +488,7 @@ stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE.
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; }
stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C). { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_MAX_DELAY; ((SStreamOptions*)B)->pDelay = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED. { ((SStreamOptions*)B)->ignoreExpired = true; A = B; }
/************************************************ kill connection/query ***********************************************/
cmd ::= KILL CONNECTION NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_CONNECTION_STMT, &A); }
...
...
@@ -848,14 +849,10 @@ set_quantifier_opt(A) ::= ALL.
%type select_list { SNodeList* }
%destructor select_list { nodesDestroyList($$); }
select_list(A) ::= NK_STAR. { A = NULL; }
select_list(A) ::= select_sublist(B). { A = B; }
%type select_sublist { SNodeList* }
%destructor select_sublist { nodesDestroyList($$); }
select_sublist(A) ::= select_item(B). { A = createNodeList(pCxt, B); }
select_sublist(A) ::= select_sublist(B) NK_COMMA select_item(C). { A = addNodeToList(pCxt, B, C); }
select_list(A) ::= select_item(B). { A = createNodeList(pCxt, B); }
select_list(A) ::= select_list(B) NK_COMMA select_item(C). { A = addNodeToList(pCxt, B, C); }
select_item(A) ::= NK_STAR(B). { A = createColumnNode(pCxt, NULL, &B); }
select_item(A) ::= common_expression(B). { A = releaseRawExprNode(pCxt, B); }
select_item(A) ::= common_expression(B) column_alias(C). { A = setProjectionAlias(pCxt, releaseRawExprNode(pCxt, B), &C); }
select_item(A) ::= common_expression(B) AS column_alias(C). { A = setProjectionAlias(pCxt, releaseRawExprNode(pCxt, B), &C); }
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
8c2e9842
...
...
@@ -1302,6 +1302,74 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
parseCsvFile
(
SInsertParseContext
*
pCxt
,
TdFilePtr
fp
,
STableDataBlocks
*
pDataBlock
,
int
maxRows
,
int32_t
*
numOfRows
)
{
STableComInfo
tinfo
=
getTableInfo
(
pDataBlock
->
pTableMeta
);
int32_t
extendedRowSize
=
getExtendedRowSize
(
pDataBlock
);
CHECK_CODE
(
initRowBuilder
(
&
pDataBlock
->
rowBuilder
,
pDataBlock
->
pTableMeta
->
sversion
,
&
pDataBlock
->
boundColumnInfo
));
(
*
numOfRows
)
=
0
;
char
tmpTokenBuf
[
TSDB_MAX_BYTES_PER_ROW
]
=
{
0
};
// used for deleting Escape character: \\, \', \"
char
*
pLine
=
NULL
;
int64_t
readLen
=
0
;
while
((
readLen
=
taosGetLineFile
(
fp
,
&
pLine
))
!=
-
1
)
{
if
((
'\r'
==
pLine
[
readLen
-
1
])
||
(
'\n'
==
pLine
[
readLen
-
1
]))
{
pLine
[
--
readLen
]
=
'\0'
;
}
if
(
readLen
==
0
)
{
continue
;
}
if
((
*
numOfRows
)
>=
maxRows
||
pDataBlock
->
size
+
extendedRowSize
>=
pDataBlock
->
nAllocSize
)
{
int32_t
tSize
;
CHECK_CODE
(
allocateMemIfNeed
(
pDataBlock
,
extendedRowSize
,
&
tSize
));
ASSERT
(
tSize
>=
maxRows
);
maxRows
=
tSize
;
}
strtolower
(
pLine
,
pLine
);
char
*
pRawSql
=
pCxt
->
pSql
;
pCxt
->
pSql
=
pLine
;
bool
gotRow
=
false
;
CHECK_CODE
(
parseOneRow
(
pCxt
,
pDataBlock
,
tinfo
.
precision
,
&
gotRow
,
tmpTokenBuf
));
if
(
gotRow
)
{
pDataBlock
->
size
+=
extendedRowSize
;
// len;
(
*
numOfRows
)
++
;
}
pCxt
->
pSql
=
pRawSql
;
}
if
(
0
==
(
*
numOfRows
)
&&
(
!
TSDB_QUERY_HAS_TYPE
(
pCxt
->
pOutput
->
insertType
,
TSDB_QUERY_TYPE_STMT_INSERT
)))
{
return
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"no any data points"
,
NULL
);
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
parseDataFromFile
(
SInsertParseContext
*
pCxt
,
SToken
filePath
,
STableDataBlocks
*
dataBuf
)
{
char
filePathStr
[
TSDB_FILENAME_LEN
]
=
{
0
};
strncpy
(
filePathStr
,
filePath
.
z
,
filePath
.
n
);
TdFilePtr
fp
=
taosOpenFile
(
filePathStr
,
TD_FILE_READ
|
TD_FILE_STREAM
);
if
(
NULL
==
fp
)
{
return
TAOS_SYSTEM_ERROR
(
errno
);
}
int32_t
maxNumOfRows
;
CHECK_CODE
(
allocateMemIfNeed
(
dataBuf
,
getExtendedRowSize
(
dataBuf
),
&
maxNumOfRows
));
int32_t
numOfRows
=
0
;
CHECK_CODE
(
parseCsvFile
(
pCxt
,
fp
,
dataBuf
,
maxNumOfRows
,
&
numOfRows
));
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)(
dataBuf
->
pData
);
if
(
TSDB_CODE_SUCCESS
!=
setBlockInfo
(
pBlocks
,
dataBuf
,
numOfRows
))
{
return
buildInvalidOperationMsg
(
&
pCxt
->
msg
,
"too many rows in sql, total number of rows should be less than 32767"
);
}
dataBuf
->
numOfTables
=
1
;
pCxt
->
totalNum
+=
numOfRows
;
return
TSDB_CODE_SUCCESS
;
}
void
destroyCreateSubTbReq
(
SVCreateTbReq
*
pReq
)
{
taosMemoryFreeClear
(
pReq
->
name
);
taosMemoryFreeClear
(
pReq
->
ctb
.
pTag
);
...
...
@@ -1421,7 +1489,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
if
(
0
==
sToken
.
n
||
(
TK_NK_STRING
!=
sToken
.
type
&&
TK_NK_ID
!=
sToken
.
type
))
{
return
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"file path is required following keyword FILE"
,
sToken
.
z
);
}
// todo
CHECK_CODE
(
parseDataFromFile
(
pCxt
,
sToken
,
dataBuf
));
pCxt
->
pOutput
->
insertType
=
TSDB_QUERY_TYPE_FILE_INSERT
;
tbNum
++
;
...
...
source/libs/parser/src/parTokenizer.c
浏览文件 @
8c2e9842
...
...
@@ -84,8 +84,10 @@ static SKeyword keywordTable[] = {
{
"DURATION"
,
TK_DURATION
},
{
"ENABLE"
,
TK_ENABLE
},
{
"EXISTS"
,
TK_EXISTS
},
{
"EXPIRED"
,
TK_EXPIRED
},
{
"EXPLAIN"
,
TK_EXPLAIN
},
{
"EVERY"
,
TK_EVERY
},
{
"FILE"
,
TK_FILE
},
{
"FILL"
,
TK_FILL
},
{
"FIRST"
,
TK_FIRST
},
{
"FLOAT"
,
TK_FLOAT
},
...
...
@@ -98,6 +100,7 @@ static SKeyword keywordTable[] = {
{
"GROUP"
,
TK_GROUP
},
{
"HAVING"
,
TK_HAVING
},
{
"IF"
,
TK_IF
},
{
"IGNORE"
,
TK_IGNORE
},
{
"IMPORT"
,
TK_IMPORT
},
{
"IN"
,
TK_IN
},
{
"INDEX"
,
TK_INDEX
},
...
...
@@ -289,7 +292,6 @@ static SKeyword keywordTable[] = {
// {"END", TK_END},
// {"FAIL", TK_FAIL},
// {"FOR", TK_FOR},
// {"IGNORE", TK_IGNORE},
// {"IMMEDIATE", TK_IMMEDIATE},
// {"INITIALLY", TK_INITIALLY},
// {"INSTEAD", TK_INSTEAD},
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
8c2e9842
...
...
@@ -1984,34 +1984,38 @@ static int32_t createMultiResFuncsFromStar(STranslateContext* pCxt, SFunctionNod
}
static
int32_t
translateStar
(
STranslateContext
*
pCxt
,
SSelectStmt
*
pSelect
)
{
if
(
NULL
==
pSelect
->
pProjectionList
)
{
// select * ...
return
createAllColumns
(
pCxt
,
&
pSelect
->
pProjectionList
);
}
else
{
SNode
*
pNode
=
NULL
;
WHERE_EACH
(
pNode
,
pSelect
->
pProjectionList
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
isMultiResFunc
(
pNode
))
{
SNodeList
*
pFuncs
=
NULL
;
code
=
createMultiResFuncsFromStar
(
pCxt
,
(
SFunctionNode
*
)
pNode
,
&
pFuncs
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
INSERT_LIST
(
pSelect
->
pProjectionList
,
pFuncs
);
ERASE_NODE
(
pSelect
->
pProjectionList
);
continue
;
}
}
else
if
(
isTableStar
(
pNode
))
{
SNodeList
*
pCols
=
NULL
;
code
=
createTableAllCols
(
pCxt
,
(
SColumnNode
*
)
pNode
,
&
pCols
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
INSERT_LIST
(
pSelect
->
pProjectionList
,
pCols
);
ERASE_NODE
(
pSelect
->
pProjectionList
);
continue
;
}
SNode
*
pNode
=
NULL
;
WHERE_EACH
(
pNode
,
pSelect
->
pProjectionList
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
isStar
(
pNode
))
{
SNodeList
*
pCols
=
NULL
;
code
=
createAllColumns
(
pCxt
,
&
pCols
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
INSERT_LIST
(
pSelect
->
pProjectionList
,
pCols
);
ERASE_NODE
(
pSelect
->
pProjectionList
);
continue
;
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
}
else
if
(
isMultiResFunc
(
pNode
))
{
SNodeList
*
pFuncs
=
NULL
;
code
=
createMultiResFuncsFromStar
(
pCxt
,
(
SFunctionNode
*
)
pNode
,
&
pFuncs
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
INSERT_LIST
(
pSelect
->
pProjectionList
,
pFuncs
);
ERASE_NODE
(
pSelect
->
pProjectionList
);
continue
;
}
}
else
if
(
isTableStar
(
pNode
))
{
SNodeList
*
pCols
=
NULL
;
code
=
createTableAllCols
(
pCxt
,
(
SColumnNode
*
)
pNode
,
&
pCols
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
INSERT_LIST
(
pSelect
->
pProjectionList
,
pCols
);
ERASE_NODE
(
pSelect
->
pProjectionList
);
continue
;
}
WHERE_NEXT
;
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
}
WHERE_NEXT
;
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -4204,6 +4208,7 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
pReq
->
triggerType
=
pStmt
->
pOptions
->
triggerType
;
pReq
->
maxDelay
=
(
NULL
!=
pStmt
->
pOptions
->
pDelay
?
((
SValueNode
*
)
pStmt
->
pOptions
->
pDelay
)
->
datum
.
i
:
0
);
pReq
->
watermark
=
(
NULL
!=
pStmt
->
pOptions
->
pWatermark
?
((
SValueNode
*
)
pStmt
->
pOptions
->
pWatermark
)
->
datum
.
i
:
0
);
pReq
->
igExpired
=
pStmt
->
pOptions
->
ignoreExpired
;
}
return
code
;
...
...
@@ -4794,6 +4799,15 @@ static const char* getSysTableName(ENodeType type) {
return
NULL
;
}
static
SNode
*
createStarCol
()
{
SColumnNode
*
pCol
=
(
SColumnNode
*
)
nodesMakeNode
(
QUERY_NODE_COLUMN
);
if
(
NULL
==
pCol
)
{
return
NULL
;
}
strcpy
(
pCol
->
colName
,
"*"
);
return
(
SNode
*
)
pCol
;
}
static
int32_t
createSimpleSelectStmt
(
const
char
*
pDb
,
const
char
*
pTable
,
SSelectStmt
**
pStmt
)
{
SSelectStmt
*
pSelect
=
(
SSelectStmt
*
)
nodesMakeNode
(
QUERY_NODE_SELECT_STMT
);
if
(
NULL
==
pSelect
)
{
...
...
@@ -4811,6 +4825,11 @@ static int32_t createSimpleSelectStmt(const char* pDb, const char* pTable, SSele
strcpy
(
pRealTable
->
table
.
tableAlias
,
pTable
);
pSelect
->
pFromTable
=
(
SNode
*
)
pRealTable
;
if
(
TSDB_CODE_SUCCESS
!=
nodesListMakeStrictAppend
(
&
pSelect
->
pProjectionList
,
createStarCol
()))
{
nodesDestroyNode
((
SNode
*
)
pSelect
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
*
pStmt
=
pSelect
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -4959,6 +4978,7 @@ static int32_t rewriteShowTableDist(STranslateContext* pCxt, SQuery* pQuery) {
SSelectStmt
*
pStmt
=
NULL
;
int32_t
code
=
createSelectStmtForShowTableDist
((
SShowTableDistributedStmt
*
)
pQuery
->
pRoot
,
&
pStmt
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
NODES_DESTORY_LIST
(
pStmt
->
pProjectionList
);
code
=
nodesListMakeStrictAppend
(
&
pStmt
->
pProjectionList
,
createBlockDistFunc
());
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
source/libs/parser/src/sql.c
浏览文件 @
8c2e9842
此差异已折叠。
点击以展开。
source/libs/parser/test/parInitialCTest.cpp
浏览文件 @
8c2e9842
...
...
@@ -526,20 +526,22 @@ TEST_F(ParserInitialCTest, createStream) {
memset
(
&
expect
,
0
,
sizeof
(
SCMCreateStreamReq
));
};
auto
setCreateStreamReqFunc
=
[
&
](
const
char
*
pStream
,
const
char
*
pSrcDb
,
const
char
*
pSql
,
const
char
*
pDstStb
=
nullptr
,
int8_t
igExists
=
0
,
int8_t
triggerType
=
STREAM_TRIGGER_AT_ONCE
,
int64_t
maxDelay
=
0
,
int64_t
watermark
=
0
)
{
snprintf
(
expect
.
name
,
sizeof
(
expect
.
name
),
"0.%s"
,
pStream
);
snprintf
(
expect
.
sourceDB
,
sizeof
(
expect
.
sourceDB
),
"0.%s"
,
pSrcDb
);
if
(
NULL
!=
pDstStb
)
{
snprintf
(
expect
.
targetStbFullName
,
sizeof
(
expect
.
targetStbFullName
),
"0.test.%s"
,
pDstStb
);
}
expect
.
igExists
=
igExists
;
expect
.
sql
=
strdup
(
pSql
);
expect
.
triggerType
=
triggerType
;
expect
.
maxDelay
=
maxDelay
;
expect
.
watermark
=
watermark
;
};
auto
setCreateStreamReqFunc
=
[
&
](
const
char
*
pStream
,
const
char
*
pSrcDb
,
const
char
*
pSql
,
const
char
*
pDstStb
=
nullptr
,
int8_t
igExists
=
0
,
int8_t
triggerType
=
STREAM_TRIGGER_AT_ONCE
,
int64_t
maxDelay
=
0
,
int64_t
watermark
=
0
,
int8_t
igExpired
=
0
)
{
snprintf
(
expect
.
name
,
sizeof
(
expect
.
name
),
"0.%s"
,
pStream
);
snprintf
(
expect
.
sourceDB
,
sizeof
(
expect
.
sourceDB
),
"0.%s"
,
pSrcDb
);
if
(
NULL
!=
pDstStb
)
{
snprintf
(
expect
.
targetStbFullName
,
sizeof
(
expect
.
targetStbFullName
),
"0.test.%s"
,
pDstStb
);
}
expect
.
igExists
=
igExists
;
expect
.
sql
=
strdup
(
pSql
);
expect
.
triggerType
=
triggerType
;
expect
.
maxDelay
=
maxDelay
;
expect
.
watermark
=
watermark
;
expect
.
igExpired
=
igExpired
;
};
setCheckDdlFunc
([
&
](
const
SQuery
*
pQuery
,
ParserStage
stage
)
{
ASSERT_EQ
(
nodeType
(
pQuery
->
pRoot
),
QUERY_NODE_CREATE_STREAM_STMT
);
...
...
@@ -555,6 +557,7 @@ TEST_F(ParserInitialCTest, createStream) {
ASSERT_EQ
(
req
.
triggerType
,
expect
.
triggerType
);
ASSERT_EQ
(
req
.
maxDelay
,
expect
.
maxDelay
);
ASSERT_EQ
(
req
.
watermark
,
expect
.
watermark
);
ASSERT_EQ
(
req
.
igExpired
,
expect
.
igExpired
);
tFreeSCMCreateStreamReq
(
&
req
);
});
...
...
@@ -571,9 +574,10 @@ TEST_F(ParserInitialCTest, createStream) {
clearCreateStreamReq
();
setCreateStreamReqFunc
(
"s1"
,
"test"
,
"create stream if not exists s1 trigger max_delay 20s watermark 10s into st1 as select * from t1"
,
"st1"
,
1
,
STREAM_TRIGGER_MAX_DELAY
,
20
*
MILLISECOND_PER_SECOND
,
10
*
MILLISECOND_PER_SECOND
);
run
(
"CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s INTO st1 AS SELECT * FROM t1"
);
"s1"
,
"test"
,
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired into st1 as select * from t1"
,
"st1"
,
1
,
STREAM_TRIGGER_MAX_DELAY
,
20
*
MILLISECOND_PER_SECOND
,
10
*
MILLISECOND_PER_SECOND
,
1
);
run
(
"CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED INTO st1 AS SELECT * FROM t1"
);
clearCreateStreamReq
();
}
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
8c2e9842
...
...
@@ -614,10 +614,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
if
(
pCxt
->
pPlanCxt
->
streamQuery
)
{
pWindow
->
triggerType
=
pCxt
->
pPlanCxt
->
triggerType
;
pWindow
->
watermark
=
pCxt
->
pPlanCxt
->
watermark
;
}
if
(
pCxt
->
pPlanCxt
->
rSmaQuery
)
{
/*pWindow->filesFactor = pCxt->pPlanCxt->filesFactor;*/
pWindow
->
igExpired
=
pCxt
->
pPlanCxt
->
igExpired
;
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
8c2e9842
...
...
@@ -227,8 +227,7 @@ static void scanPathOptSetScanWin(SScanLogicNode* pScan) {
pScan
->
slidingUnit
=
((
SWindowLogicNode
*
)
pParent
)
->
slidingUnit
;
pScan
->
triggerType
=
((
SWindowLogicNode
*
)
pParent
)
->
triggerType
;
pScan
->
watermark
=
((
SWindowLogicNode
*
)
pParent
)
->
watermark
;
pScan
->
tsColId
=
((
SColumnNode
*
)((
SWindowLogicNode
*
)
pParent
)
->
pTspk
)
->
colId
;
pScan
->
filesFactor
=
((
SWindowLogicNode
*
)
pParent
)
->
filesFactor
;
pScan
->
igExpired
=
((
SWindowLogicNode
*
)
pParent
)
->
igExpired
;
}
}
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
8c2e9842
...
...
@@ -534,8 +534,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
pTableScan
->
slidingUnit
=
pScanLogicNode
->
slidingUnit
;
pTableScan
->
triggerType
=
pScanLogicNode
->
triggerType
;
pTableScan
->
watermark
=
pScanLogicNode
->
watermark
;
pTableScan
->
tsColId
=
pScanLogicNode
->
tsColId
;
pTableScan
->
filesFactor
=
pScanLogicNode
->
filesFactor
;
pTableScan
->
igExpired
=
pScanLogicNode
->
igExpired
;
return
createScanPhysiNodeFinalize
(
pCxt
,
pSubplan
,
pScanLogicNode
,
(
SScanPhysiNode
*
)
pTableScan
,
pPhyNode
);
}
...
...
@@ -1054,7 +1053,7 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
pWindow
->
triggerType
=
pWindowLogicNode
->
triggerType
;
pWindow
->
watermark
=
pWindowLogicNode
->
watermark
;
pWindow
->
filesFactor
=
pWindowLogicNode
->
filesFactor
;
pWindow
->
igExpired
=
pWindowLogicNode
->
igExpired
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pPhyNode
=
(
SPhysiNode
*
)
pWindow
;
...
...
tests/test/c/tmqSim.c
浏览文件 @
8c2e9842
...
...
@@ -431,7 +431,7 @@ static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields
int32_t
precision
)
{
for
(
int32_t
i
=
0
;
i
<
num_fields
;
i
++
)
{
if
(
i
>
0
)
{
taosFprintfFile
(
pFile
,
"
\n
"
);
taosFprintfFile
(
pFile
,
"
,
"
);
}
shellDumpFieldToFile
(
pFile
,
(
const
char
*
)
row
[
i
],
fields
+
i
,
length
[
i
],
precision
);
}
...
...
@@ -521,15 +521,15 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
static
int32_t
g_once_commit_flag
=
0
;
static
void
tmq_commit_cb_print
(
tmq_t
*
tmq
,
int32_t
code
,
void
*
param
)
{
pError
(
"tmq_commit_cb_print() commit %d
\n
"
,
code
);
pError
(
"tmq_commit_cb_print() commit %d
\n
"
,
code
);
if
(
0
==
g_once_commit_flag
)
{
g_once_commit_flag
=
1
;
notifyMainScript
((
SThreadInfo
*
)
param
,
(
int32_t
)
NOTIFY_CMD_START_COMMIT
);
if
(
0
==
g_once_commit_flag
)
{
g_once_commit_flag
=
1
;
notifyMainScript
((
SThreadInfo
*
)
param
,
(
int32_t
)
NOTIFY_CMD_START_COMMIT
);
}
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s tmq_commit_cb_print() be called
\n
"
,
getCurrentTimeString
(
tmpString
));
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s tmq_commit_cb_print() be called
\n
"
,
getCurrentTimeString
(
tmpString
));
}
void
build_consumer
(
SThreadInfo
*
pInfo
)
{
...
...
@@ -633,8 +633,8 @@ void loop_consume(SThreadInfo* pInfo) {
}
}
uint64_t
lastPrintTime
=
taosGetTimestampMs
();
uint64_t
startTs
=
taosGetTimestampMs
();
uint64_t
lastPrintTime
=
taosGetTimestampMs
();
uint64_t
startTs
=
taosGetTimestampMs
();
int32_t
consumeDelay
=
g_stConfInfo
.
consumeDelay
==
-
1
?
-
1
:
(
g_stConfInfo
.
consumeDelay
*
1000
);
while
(
running
)
{
...
...
@@ -647,15 +647,14 @@ void loop_consume(SThreadInfo* pInfo) {
taos_free_result
(
tmqMsg
);
totalMsgs
++
;
int64_t
currentPrintTime
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
10
*
1000
)
{
taosFprintfFile
(
g_fp
,
"consumer id %d has currently poll total msgs: %"
PRId64
"
\n
"
,
pInfo
->
consumerId
,
totalMsgs
);
lastPrintTime
=
currentPrintTime
;
}
int64_t
currentPrintTime
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
10
*
1000
)
{
taosFprintfFile
(
g_fp
,
"consumer id %d has currently poll total msgs: %"
PRId64
"
\n
"
,
pInfo
->
consumerId
,
totalMsgs
);
lastPrintTime
=
currentPrintTime
;
}
if
(
0
==
once_flag
)
{
once_flag
=
1
;
notifyMainScript
(
pInfo
,
NOTIFY_CMD_START_CONSUM
);
...
...
@@ -672,7 +671,7 @@ void loop_consume(SThreadInfo* pInfo) {
break
;
}
}
if
(
0
==
running
)
{
taosFprintfFile
(
g_fp
,
"receive stop signal and not continue consume
\n
"
);
}
...
...
@@ -882,15 +881,14 @@ int main(int32_t argc, char* argv[]) {
int64_t
t
=
end
-
start
;
if
(
0
==
t
)
t
=
1
;
double
tInMs
=
(
double
)
t
/
1000000
.
0
;
taosFprintfFile
(
g_fp
,
"Spent %.4f seconds to poll msgs: %"
PRIu64
" with %d thread(s), throughput: %.2f msgs/second
\n\n
"
,
tInMs
,
totalMsgs
,
g_stConfInfo
.
numOfThread
,
(
double
)(
totalMsgs
/
tInMs
));
"Spent %.4f seconds to poll msgs: %"
PRIu64
" with %d thread(s), throughput: %.2f msgs/second
\n\n
"
,
tInMs
,
totalMsgs
,
g_stConfInfo
.
numOfThread
,
(
double
)(
totalMsgs
/
tInMs
));
taosFprintfFile
(
g_fp
,
"==== close tmqlog ====
\n
"
);
taosCloseFile
(
&
g_fp
);
return
0
;
}
tools/shell/src/shellEngine.c
浏览文件 @
8c2e9842
...
...
@@ -364,7 +364,7 @@ int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres) {
int32_t
*
length
=
taos_fetch_lengths
(
tres
);
for
(
int32_t
i
=
0
;
i
<
num_fields
;
i
++
)
{
if
(
i
>
0
)
{
taosFprintfFile
(
pFile
,
"
\n
"
);
taosFprintfFile
(
pFile
,
"
,
"
);
}
shellDumpFieldToFile
(
pFile
,
(
const
char
*
)
row
[
i
],
fields
+
i
,
length
[
i
],
precision
);
}
...
...
@@ -394,9 +394,9 @@ void shellPrintNChar(const char *str, int32_t length, int32_t width) {
break
;
}
int
w
=
0
;
if
(
*
(
str
+
pos
)
==
'\t'
||
*
(
str
+
pos
)
==
'\n'
||
*
(
str
+
pos
)
==
'\r'
)
{
if
(
*
(
str
+
pos
)
==
'\t'
||
*
(
str
+
pos
)
==
'\n'
||
*
(
str
+
pos
)
==
'\r'
)
{
w
=
bytes
;
}
else
{
}
else
{
w
=
taosWcharWidth
(
wc
);
}
pos
+=
bytes
;
...
...
@@ -513,7 +513,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
}
bool
shellIsLimitQuery
(
const
char
*
sql
)
{
//todo refactor
//
todo refactor
if
(
taosStrCaseStr
(
sql
,
" limit "
)
!=
NULL
)
{
return
true
;
}
...
...
@@ -522,7 +522,7 @@ bool shellIsLimitQuery(const char *sql) {
}
bool
shellIsShowQuery
(
const
char
*
sql
)
{
//todo refactor
//
todo refactor
if
(
taosStrCaseStr
(
sql
,
"show "
)
!=
NULL
)
{
return
true
;
}
...
...
@@ -530,7 +530,6 @@ bool shellIsShowQuery(const char *sql) {
return
false
;
}
int32_t
shellVerticalPrintResult
(
TAOS_RES
*
tres
,
const
char
*
sql
)
{
TAOS_ROW
row
=
taos_fetch_row
(
tres
);
if
(
row
==
NULL
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录