Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4b564b2e
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4b564b2e
编写于
5月 13, 2022
作者:
5
54liuyao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add stream ci
上级
72076753
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
355 addition
and
19 deletion
+355
-19
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+1
-12
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+5
-4
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+3
-1
source/libs/stream/src/tstream.c
source/libs/stream/src/tstream.c
+1
-1
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-0
tests/script/tsim/tstream/basic1.sim
tests/script/tsim/tstream/basic1.sim
+343
-0
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
4b564b2e
...
@@ -706,7 +706,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
...
@@ -706,7 +706,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SOperatorInfo
*
createDataBlockInfoScanOperator
(
void
*
dataReader
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createDataBlockInfoScanOperator
(
void
*
dataReader
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamScanOperatorInfo
(
void
*
streamReadHandle
,
void
*
pDataReader
,
SSDataBlock
*
pResBlock
,
SOperatorInfo
*
createStreamScanOperatorInfo
(
void
*
streamReadHandle
,
void
*
pDataReader
,
SSDataBlock
*
pResBlock
,
SArray
*
pColList
,
SArray
*
pTableIdList
,
SExecTaskInfo
*
pTaskInfo
,
SArray
*
pColList
,
SArray
*
pTableIdList
,
SExecTaskInfo
*
pTaskInfo
,
SNode
*
pConditions
,
SOperatorInfo
*
pOperatorDumy
,
SInterval
*
pInterval
);
SNode
*
pConditions
,
SOperatorInfo
*
pOperatorDumy
);
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SInterval
*
pInterval
,
STimeWindow
*
pWindow
,
SSDataBlock
*
pResBlock
,
int32_t
fillType
,
SNodeListNode
*
fillVal
,
SInterval
*
pInterval
,
STimeWindow
*
pWindow
,
SSDataBlock
*
pResBlock
,
int32_t
fillType
,
SNodeListNode
*
fillVal
,
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
4b564b2e
...
@@ -4808,17 +4808,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
...
@@ -4808,17 +4808,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
}
SDataBlockDescNode
*
pDescNode
=
pScanPhyNode
->
node
.
pOutputDataBlockDesc
;
SDataBlockDescNode
*
pDescNode
=
pScanPhyNode
->
node
.
pOutputDataBlockDesc
;
SArray
*
pColList
=
extractColMatchInfo
(
pScanPhyNode
->
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
SSDataBlock
*
pResBlockDumy
=
createResDataBlock
(
pDescNode
);
SQueryTableDataCond
cond
=
{
0
};
int32_t
code
=
initQueryTableDataCond
(
&
cond
,
pTableScanNode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
SInterval
interval
=
extractIntervalInfo
(
pTableScanNode
);
SOperatorInfo
*
pOperatorDumy
=
createTableScanOperatorInfo
(
pTableScanNode
,
pDataReader
,
pHandle
,
pTaskInfo
);
SOperatorInfo
*
pOperatorDumy
=
createTableScanOperatorInfo
(
pTableScanNode
,
pDataReader
,
pHandle
,
pTaskInfo
);
SArray
*
tableIdList
=
extractTableIdList
(
pTableGroupInfo
);
SArray
*
tableIdList
=
extractTableIdList
(
pTableGroupInfo
);
...
@@ -4826,7 +4815,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
...
@@ -4826,7 +4815,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SArray
*
pCols
=
extractColMatchInfo
(
pScanPhyNode
->
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
SArray
*
pCols
=
extractColMatchInfo
(
pScanPhyNode
->
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
SOperatorInfo
*
pOperator
=
createStreamScanOperatorInfo
(
pHandle
->
reader
,
pDataReader
,
pResBlock
,
pCols
,
tableIdList
,
pTaskInfo
,
SOperatorInfo
*
pOperator
=
createStreamScanOperatorInfo
(
pHandle
->
reader
,
pDataReader
,
pResBlock
,
pCols
,
tableIdList
,
pTaskInfo
,
pScanPhyNode
->
node
.
pConditions
,
pOperatorDumy
,
&
interval
);
pScanPhyNode
->
node
.
pConditions
,
pOperatorDumy
);
taosArrayDestroy
(
tableIdList
);
taosArrayDestroy
(
tableIdList
);
return
pOperator
;
return
pOperator
;
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
==
type
)
{
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
==
type
)
{
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
4b564b2e
...
@@ -782,8 +782,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
...
@@ -782,8 +782,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
SOperatorInfo
*
createStreamScanOperatorInfo
(
void
*
streamReadHandle
,
void
*
pDataReader
,
SOperatorInfo
*
createStreamScanOperatorInfo
(
void
*
streamReadHandle
,
void
*
pDataReader
,
SSDataBlock
*
pResBlock
,
SArray
*
pColList
,
SArray
*
pTableIdList
,
SSDataBlock
*
pResBlock
,
SArray
*
pColList
,
SArray
*
pTableIdList
,
SExecTaskInfo
*
pTaskInfo
,
SNode
*
pCondition
,
SOperatorInfo
*
pOperatorDumy
,
SExecTaskInfo
*
pTaskInfo
,
SNode
*
pCondition
,
SOperatorInfo
*
pOperatorDumy
)
{
SInterval
*
pInterval
)
{
SStreamBlockScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamBlockScanInfo
));
SStreamBlockScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamBlockScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
@@ -791,6 +790,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
...
@@ -791,6 +790,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
goto
_error
;
goto
_error
;
}
}
STableScanInfo
*
pSTInfo
=
(
STableScanInfo
*
)
pOperatorDumy
->
info
;
int32_t
numOfOutput
=
taosArrayGetSize
(
pColList
);
int32_t
numOfOutput
=
taosArrayGetSize
(
pColList
);
SArray
*
pColIds
=
taosArrayInit
(
4
,
sizeof
(
int16_t
));
SArray
*
pColIds
=
taosArrayInit
(
4
,
sizeof
(
int16_t
));
...
@@ -823,7 +824,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
...
@@ -823,7 +824,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
}
}
pInfo
->
primaryTsIndex
=
0
;
// TODO(liuyao) get it from physical plan
pInfo
->
primaryTsIndex
=
0
;
// TODO(liuyao) get it from physical plan
pInfo
->
pUpdateInfo
=
updateInfoInitP
(
pI
nterval
,
10000
);
// TODO(liuyao) get watermark from physical plan
pInfo
->
pUpdateInfo
=
updateInfoInitP
(
&
pSTInfo
->
i
nterval
,
10000
);
// TODO(liuyao) get watermark from physical plan
if
(
pInfo
->
pUpdateInfo
==
NULL
)
{
if
(
pInfo
->
pUpdateInfo
==
NULL
)
{
taosMemoryFreeClear
(
pInfo
);
taosMemoryFreeClear
(
pInfo
);
taosMemoryFreeClear
(
pOperator
);
taosMemoryFreeClear
(
pOperator
);
...
@@ -836,7 +837,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
...
@@ -836,7 +837,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
pInfo
->
pDataReader
=
pDataReader
;
pInfo
->
pDataReader
=
pDataReader
;
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
pInfo
->
pOperatorDumy
=
pOperatorDumy
;
pInfo
->
pOperatorDumy
=
pOperatorDumy
;
pInfo
->
interval
=
*
pI
nterval
;
pInfo
->
interval
=
pSTInfo
->
i
nterval
;
pOperator
->
name
=
"StreamBlockScanOperator"
;
pOperator
->
name
=
"StreamBlockScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
;
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
4b564b2e
...
@@ -228,10 +228,12 @@ static void setScanWindowInfo(SScanLogicNode* pScan) {
...
@@ -228,10 +228,12 @@ static void setScanWindowInfo(SScanLogicNode* pScan) {
static
int32_t
osdOptimize
(
SOptimizeContext
*
pCxt
,
SLogicNode
*
pLogicNode
)
{
static
int32_t
osdOptimize
(
SOptimizeContext
*
pCxt
,
SLogicNode
*
pLogicNode
)
{
SOsdInfo
info
=
{
0
};
SOsdInfo
info
=
{
0
};
int32_t
code
=
osdMatch
(
pCxt
,
pLogicNode
,
&
info
);
int32_t
code
=
osdMatch
(
pCxt
,
pLogicNode
,
&
info
);
if
(
TSDB_CODE_SUCCESS
==
code
&&
info
.
pScan
)
{
setScanWindowInfo
((
SScanLogicNode
*
)
info
.
pScan
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
(
NULL
!=
info
.
pDsoFuncs
||
NULL
!=
info
.
pSdrFuncs
))
{
if
(
TSDB_CODE_SUCCESS
==
code
&&
(
NULL
!=
info
.
pDsoFuncs
||
NULL
!=
info
.
pSdrFuncs
))
{
info
.
pScan
->
dataRequired
=
osdGetDataRequired
(
info
.
pSdrFuncs
);
info
.
pScan
->
dataRequired
=
osdGetDataRequired
(
info
.
pSdrFuncs
);
info
.
pScan
->
pDynamicScanFuncs
=
info
.
pDsoFuncs
;
info
.
pScan
->
pDynamicScanFuncs
=
info
.
pDsoFuncs
;
setScanWindowInfo
((
SScanLogicNode
*
)
info
.
pScan
);
OPTIMIZE_FLAG_SET_MASK
(
info
.
pScan
->
node
.
optimizedFlag
,
OPTIMIZE_FLAG_OSD
);
OPTIMIZE_FLAG_SET_MASK
(
info
.
pScan
->
node
.
optimizedFlag
,
OPTIMIZE_FLAG_OSD
);
pCxt
->
optimized
=
true
;
pCxt
->
optimized
=
true
;
}
}
...
...
source/libs/stream/src/tstream.c
浏览文件 @
4b564b2e
...
@@ -154,7 +154,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
...
@@ -154,7 +154,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
// sink
// sink
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
blockDebugShowData
(
pRes
);
//
blockDebugShowData(pRes);
pTask
->
tbSink
.
tbSinkFunc
(
pTask
,
pTask
->
tbSink
.
vnode
,
0
,
pRes
);
pTask
->
tbSink
.
tbSinkFunc
(
pTask
,
pTask
->
tbSink
.
vnode
,
0
,
pRes
);
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
pTask
->
smaSink
.
smaSink
(
pTask
->
ahandle
,
pTask
->
smaSink
.
smaId
,
pRes
);
pTask
->
smaSink
.
smaSink
(
pTask
->
ahandle
,
pTask
->
smaSink
.
smaId
,
pRes
);
...
...
tests/script/jenkins/basic.txt
浏览文件 @
4b564b2e
...
@@ -63,6 +63,7 @@
...
@@ -63,6 +63,7 @@
# ---- tstream
# ---- tstream
./test.sh -f tsim/tstream/basic0.sim
./test.sh -f tsim/tstream/basic0.sim
./test.sh -f tsim/tstream/basic1.sim
# ---- transaction
# ---- transaction
./test.sh -f tsim/trans/create_db.sim
./test.sh -f tsim/trans/create_db.sim
...
...
tests/script/tsim/tstream/basic1.sim
0 → 100644
浏览文件 @
4b564b2e
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database test vgroups 1
sql show databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once into streamt as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sql insert into t1 values(1648791243003,4,2,3,3.1);
sql insert into t1 values(1648791213004,4,2,3,4.1);
sleep 1000
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
print ======$rows
return -1
endi
# row 0
if $data01 != 2 then
print ======$data01
return -1
endi
if $data02 != 2 then
print ======$data02
return -1
endi
if $data03 != 5 then
print ======$data03
return -1
endi
if $data04 != 2 then
print ======$data04
return -1
endi
if $data05 != 3 then
print ======$data05
return -1
endi
# row 1
if $data11 != 1 then
print ======$data11
return -1
endi
if $data12 != 1 then
print ======$data12
return -1
endi
if $data13 != 2 then
print ======$data13
return -1
endi
if $data14 != 2 then
print ======$data14
return -1
endi
if $data15 != 3 then
print ======$data15
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
return -1
endi
if $data22 != 1 then
print ======$data22
return -1
endi
if $data23 != 3 then
print ======$data23
return -1
endi
if $data24 != 2 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
# row 3
if $data31 != 1 then
print ======$data31
return -1
endi
if $data32 != 1 then
print ======$data32
return -1
endi
if $data33 != 4 then
print ======$data33
return -1
endi
if $data34 != 2 then
print ======$data34
return -1
endi
if $data35 != 3 then
print ======$data35
return -1
endi
sql insert into t1 values(1648791223001,12,14,13,11.1);
sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
print ======$rows
return -1
endi
# row 0
if $data01 != 2 then
print ======$data01
return -1
endi
if $data02 != 2 then
print ======$data02
return -1
endi
if $data03 != 5 then
print ======$data03
return -1
endi
if $data04 != 2 then
print ======$data04
return -1
endi
if $data05 != 3 then
print ======$data05
return -1
endi
# row 1
if $data11 != 1 then
print ======$data11
return -1
endi
if $data12 != 1 then
print ======$data12
return -1
endi
if $data13 != 12 then
print ======$data13
return -1
endi
if $data14 != 14 then
print ======$data14
return -1
endi
if $data15 != 13 then
print ======$data15
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
return -1
endi
if $data22 != 1 then
print ======$data22
return -1
endi
if $data23 != 3 then
print ======$data23
return -1
endi
if $data24 != 2 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
# row 3
if $data31 != 1 then
print ======$data31
return -1
endi
if $data32 != 1 then
print ======$data32
return -1
endi
if $data33 != 4 then
print ======$data33
return -1
endi
if $data34 != 2 then
print ======$data34
return -1
endi
if $data35 != 3 then
print ======$data35
return -1
endi
sql insert into t1 values(1648791223002,12,14,13,11.1);
sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 2 then
print ======$data11
return -1
endi
if $data12 != 2 then
print ======$data12
return -1
endi
if $data13 != 24 then
print ======$data13
return -1
endi
if $data14 != 14 then
print ======$data14
return -1
endi
if $data15 != 13 then
print ======$data15
return -1
endi
sql insert into t1 values(1648791223003,12,14,13,11.1);
sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
print ======$data11
return -1
endi
if $data12 != 3 then
print ======$data12
return -1
endi
if $data13 != 36 then
print ======$data13
return -1
endi
if $data14 != 14 then
print ======$data14
return -1
endi
if $data15 != 13 then
print ======$data15
return -1
endi
sql insert into t1 values(1648791223001,1,1,1,1.1);
sql insert into t1 values(1648791223002,2,2,2,2.1);
sql insert into t1 values(1648791223003,3,3,3,3.1);
sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
print ======$data11
return -1
endi
if $data12 != 3 then
print ======$data12
return -1
endi
if $data13 != 6 then
print ======$data13
return -1
endi
if $data14 != 3 then
print ======$data14
return -1
endi
if $data15 != 1 then
print ======$data15
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录