Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
eb4efcb5
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
eb4efcb5
编写于
11月 24, 2022
作者:
D
dapan1121
提交者:
GitHub
11月 24, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #18400 from taosdata/fix/TD-20580
fix:initialize maxts
上级
a67374fb
b27faba7
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
68 addition
and
9 deletion
+68
-9
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+7
-8
tests/script/tsim/stream/state0.sim
tests/script/tsim/stream/state0.sim
+60
-0
未找到文件。
source/libs/executor/src/scanoperator.c
浏览文件 @
eb4efcb5
...
@@ -1849,12 +1849,12 @@ FETCH_NEXT_BLOCK:
...
@@ -1849,12 +1849,12 @@ FETCH_NEXT_BLOCK:
prepareRangeScan
(
pInfo
,
pInfo
->
pUpdateRes
,
&
pInfo
->
updateResIndex
);
prepareRangeScan
(
pInfo
,
pInfo
->
pUpdateRes
,
&
pInfo
->
updateResIndex
);
copyDataBlock
(
pInfo
->
pDeleteDataRes
,
pInfo
->
pUpdateRes
);
copyDataBlock
(
pInfo
->
pDeleteDataRes
,
pInfo
->
pUpdateRes
);
pInfo
->
pDeleteDataRes
->
info
.
type
=
STREAM_DELETE_DATA
;
pInfo
->
pDeleteDataRes
->
info
.
type
=
STREAM_DELETE_DATA
;
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER_RANGE
;
printDataBlock
(
pDelBlock
,
"stream scan delete data"
);
printDataBlock
(
pDelBlock
,
"stream scan delete data"
);
if
(
pInfo
->
tqReader
)
{
if
(
pInfo
->
tqReader
)
{
blockDataDestroy
(
pDelBlock
);
blockDataDestroy
(
pDelBlock
);
}
}
if
(
pInfo
->
pDeleteDataRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pDeleteDataRes
->
info
.
rows
>
0
)
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER_RANGE
;
return
pInfo
->
pDeleteDataRes
;
return
pInfo
->
pDeleteDataRes
;
}
else
{
}
else
{
goto
FETCH_NEXT_BLOCK
;
goto
FETCH_NEXT_BLOCK
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
eb4efcb5
...
@@ -3364,22 +3364,23 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num
...
@@ -3364,22 +3364,23 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num
}
}
}
}
void
initDownStream
(
SOperatorInfo
*
downstream
,
SStreamAggSupporter
*
pAggSup
,
int64_t
waterMark
,
uint16_t
type
,
void
initDownStream
(
SOperatorInfo
*
downstream
,
SStreamAggSupporter
*
pAggSup
,
uint16_t
type
,
int32_t
tsColIndex
,
int32_t
tsColIndex
)
{
STimeWindowAggSupp
*
pTwSup
)
{
if
(
downstream
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION
)
{
if
(
downstream
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION
)
{
SStreamPartitionOperatorInfo
*
pScanInfo
=
downstream
->
info
;
SStreamPartitionOperatorInfo
*
pScanInfo
=
downstream
->
info
;
pScanInfo
->
tsColIndex
=
tsColIndex
;
pScanInfo
->
tsColIndex
=
tsColIndex
;
}
}
if
(
downstream
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
if
(
downstream
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
initDownStream
(
downstream
->
pDownstream
[
0
],
pAggSup
,
waterMark
,
type
,
tsColIndex
);
initDownStream
(
downstream
->
pDownstream
[
0
],
pAggSup
,
type
,
tsColIndex
,
pTwSup
);
return
;
return
;
}
}
SStreamScanInfo
*
pScanInfo
=
downstream
->
info
;
SStreamScanInfo
*
pScanInfo
=
downstream
->
info
;
pScanInfo
->
windowSup
=
(
SWindowSupporter
){.
pStreamAggSup
=
pAggSup
,
.
gap
=
pAggSup
->
gap
,
.
parentType
=
type
};
pScanInfo
->
windowSup
=
(
SWindowSupporter
){.
pStreamAggSup
=
pAggSup
,
.
gap
=
pAggSup
->
gap
,
.
parentType
=
type
};
if
(
!
pScanInfo
->
pUpdateInfo
)
{
if
(
!
pScanInfo
->
pUpdateInfo
)
{
pScanInfo
->
pUpdateInfo
=
updateInfoInit
(
60000
,
TSDB_TIME_PRECISION_MILLI
,
waterMark
);
pScanInfo
->
pUpdateInfo
=
updateInfoInit
(
60000
,
TSDB_TIME_PRECISION_MILLI
,
pTwSup
->
waterMark
);
}
}
pScanInfo
->
twAggSup
=
*
pTwSup
;
}
}
int32_t
initStreamAggSupporter
(
SStreamAggSupporter
*
pSup
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int64_t
gap
,
int32_t
initStreamAggSupporter
(
SStreamAggSupporter
*
pSup
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int64_t
gap
,
...
@@ -4102,8 +4103,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
...
@@ -4102,8 +4103,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamSessionAgg
,
NULL
,
destroyStreamSessionAggOperatorInfo
,
NULL
);
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamSessionAgg
,
NULL
,
destroyStreamSessionAggOperatorInfo
,
NULL
);
if
(
downstream
)
{
if
(
downstream
)
{
initDownStream
(
downstream
,
&
pInfo
->
streamAggSup
,
pInfo
->
twAggSup
.
waterMark
,
pOperator
->
operatorType
,
initDownStream
(
downstream
,
&
pInfo
->
streamAggSup
,
pOperator
->
operatorType
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
twAggSup
);
pInfo
->
primaryTsIndex
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
}
}
return
pOperator
;
return
pOperator
;
...
@@ -4606,8 +4606,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
...
@@ -4606,8 +4606,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo
,
pTaskInfo
);
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamStateAgg
,
NULL
,
destroyStreamStateOperatorInfo
,
NULL
);
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamStateAgg
,
NULL
,
destroyStreamStateOperatorInfo
,
NULL
);
initDownStream
(
downstream
,
&
pInfo
->
streamAggSup
,
pInfo
->
twAggSup
.
waterMark
,
pOperator
->
operatorType
,
initDownStream
(
downstream
,
&
pInfo
->
streamAggSup
,
pOperator
->
operatorType
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
twAggSup
);
pInfo
->
primaryTsIndex
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
goto
_error
;
...
...
tests/script/tsim/stream/state0.sim
浏览文件 @
eb4efcb5
...
@@ -731,5 +731,65 @@ if $data32 != 1 then
...
@@ -731,5 +731,65 @@ if $data32 != 1 then
goto loop9
goto loop9
endi
endi
sql drop stream if exists streams5;
sql drop database if exists test5;
sql create database test5;
sql use test5;
sql create table tb (ts timestamp, a int);
sql insert into tb values (now + 1m , 1 );
sql create table b (c timestamp, d int, e int , f int, g double);
sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(a) c3 from tb state_window(a);
sql insert into b values(1648791213000,NULL,NULL,NULL,NULL);
sql select * from streamt order by c1, c2, c3;
print data00:$data00
print data01:$data01
sql insert into b values(1648791213000,NULL,NULL,NULL,NULL);
sql select * from streamt order by c1, c2, c3;
print data00:$data00
print data01:$data01
sql insert into b values(1648791213001,1,2,2,2.0);
sql insert into b values(1648791213002,1,3,3,3.0);
sql insert into tb values(1648791213003,1);
sql select * from streamt;
print data00:$data00
print data01:$data01
sql delete from b where c >= 1648791213001 and c <= 1648791213002;
sql insert into b values(1648791223003,2,2,3,1.0); insert into b values(1648791223002,2,3,3,3.0);
sql insert into tb values (now + 1m , 1 );
sql select * from streamt;
print data00:$data00
print data01:$data01
sql insert into b(c,d) values (now + 6m , 6 );
sql delete from b where c >= 1648791213001 and c <= 1648791233005;;
$loop_count = 0
loop10:
sleep 200
sql select c2 from streamt;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $rows != 1 then
print =====rows=$rows
goto loop10
endi
if $data00 != 2 then
print =====data00=$data00
goto loop10
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录