Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f53521cb
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f53521cb
编写于
6月 24, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
test(stream): partition by tbname
上级
a9606ea0
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
177 addition
and
34 deletion
+177
-34
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+4
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+3
-0
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+2
-2
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+37
-27
source/libs/stream/src/streamDispatch.c
source/libs/stream/src/streamDispatch.c
+4
-3
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+2
-2
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-0
tests/script/tsim/stream/partitionby1.sim
tests/script/tsim/stream/partitionby1.sim
+124
-0
未找到文件。
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
f53521cb
...
...
@@ -321,6 +321,10 @@ FAIL:
}
int32_t
mndPersistTaskDeployReq
(
STrans
*
pTrans
,
const
SStreamTask
*
pTask
)
{
ASSERT
(
pTask
->
isDataScan
==
0
||
pTask
->
isDataScan
==
1
);
if
(
pTask
->
isDataScan
==
0
&&
pTask
->
sinkType
==
TASK_SINK__NONE
)
{
ASSERT
(
taosArrayGetSize
(
pTask
->
childEpInfo
)
!=
0
);
}
SEncoder
encoder
;
tEncoderInit
(
&
encoder
,
NULL
,
0
);
tEncodeSStreamTask
(
&
encoder
,
pTask
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
f53521cb
...
...
@@ -455,6 +455,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
}
tDecoderClear
(
&
decoder
);
ASSERT
(
pTask
->
isDataScan
==
0
||
pTask
->
isDataScan
==
1
);
if
(
pTask
->
isDataScan
==
0
&&
pTask
->
sinkType
==
TASK_SINK__NONE
)
{
ASSERT
(
taosArrayGetSize
(
pTask
->
childEpInfo
)
!=
0
);
}
pTask
->
execStatus
=
TASK_EXEC_STATUS__IDLE
;
...
...
source/libs/executor/src/executor.c
浏览文件 @
f53521cb
...
...
@@ -145,10 +145,10 @@ static SArray* filterQualifiedChildTables(const SStreamBlockScanInfo* pScanInfo,
continue
;
}
ASSERT
(
mr
.
me
.
type
==
TSDB_CHILD_TABLE
);
if
(
mr
.
me
.
ctbEntry
.
suid
!=
pScanInfo
->
tableUid
)
{
if
(
mr
.
me
.
type
!=
TSDB_CHILD_TABLE
||
mr
.
me
.
ctbEntry
.
suid
!=
pScanInfo
->
tableUid
)
{
continue
;
}
// TODO handle ntb case
taosArrayPush
(
qa
,
id
);
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
f53521cb
...
...
@@ -4365,6 +4365,26 @@ _error:
return
NULL
;
}
static
int32_t
extractTbscanInStreamOpTree
(
SOperatorInfo
*
pOperator
,
STableScanInfo
**
ppInfo
)
{
if
(
pOperator
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
if
(
pOperator
->
numOfDownstream
==
0
)
{
qError
(
"failed to find stream scan operator"
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
if
(
pOperator
->
numOfDownstream
>
1
)
{
qError
(
"join not supported for stream block scan"
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
return
extractTbscanInStreamOpTree
(
pOperator
->
pDownstream
[
0
],
ppInfo
);
}
else
{
SStreamBlockScanInfo
*
pInfo
=
pOperator
->
info
;
ASSERT
(
pInfo
->
pSnapshotReadOp
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
);
*
ppInfo
=
pInfo
->
pSnapshotReadOp
->
info
;
return
0
;
}
}
int32_t
extractTableScanNode
(
SPhysiNode
*
pNode
,
STableScanPhysiNode
**
ppNode
)
{
if
(
pNode
->
pChildren
==
NULL
||
LIST_LENGTH
(
pNode
->
pChildren
)
==
0
)
{
if
(
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
==
pNode
->
type
)
{
...
...
@@ -4387,37 +4407,27 @@ int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
return
-
1
;
}
int32_t
doRebuildReader
(
SOperatorInfo
*
pOperator
,
SSubplan
*
plan
,
SReadHandle
*
pHandle
)
{
if
(
pOperator
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
if
(
pOperator
->
numOfDownstream
==
0
)
{
qError
(
"failed to find stream scan operator"
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
int32_t
rebuildReader
(
SOperatorInfo
*
pOperator
,
SSubplan
*
plan
,
SReadHandle
*
pHandle
,
int64_t
uid
,
int64_t
ts
)
{
STableScanInfo
*
pTableScanInfo
=
NULL
;
if
(
extractTbscanInStreamOpTree
(
pOperator
,
&
pTableScanInfo
)
<
0
)
{
return
-
1
;
}
if
(
pOperator
->
numOfDownstream
>
1
)
{
qError
(
"join not supported for stream block scan"
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
return
doRebuildReader
(
pOperator
->
pDownstream
[
0
],
plan
,
pHandle
);
}
else
{
SStreamBlockScanInfo
*
pInfo
=
pOperator
->
info
;
ASSERT
(
pInfo
->
pSnapshotReadOp
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
);
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pSnapshotReadOp
->
info
;
STableScanPhysiNode
*
pNode
=
NULL
;
if
(
extractTableScanNode
(
plan
->
pNode
,
&
pNode
)
<
0
)
{
ASSERT
(
0
);
}
tsdbCleanupReadHandle
(
pTableScanInfo
->
dataReader
);
STableScanPhysiNode
*
pNode
=
NULL
;
if
(
extractTableScanNode
(
plan
->
pNode
,
&
pNode
)
<
0
)
{
ASSERT
(
0
);
}
tsdbCleanupReadHandle
(
pTableScanInfo
->
dataReader
);
STableListInfo
info
=
{
0
};
pTableScanInfo
->
dataReader
=
doCreateDataReader
(
pNode
,
pHandle
,
&
info
,
0
,
0
);
if
(
pTableScanInfo
->
dataReader
==
NULL
)
{
ASSERT
(
0
);
qError
(
"failed to create data reader"
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
STableListInfo
info
=
{
0
};
pTableScanInfo
->
dataReader
=
doCreateDataReader
(
pNode
,
pHandle
,
&
info
,
0
,
0
);
if
(
pTableScanInfo
->
dataReader
==
NULL
)
{
ASSERT
(
0
);
qError
(
"failed to create data reader"
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
// TODO: set uid and ts to data reader
return
0
;
}
...
...
source/libs/stream/src/streamDispatch.c
浏览文件 @
f53521cb
...
...
@@ -70,20 +70,21 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p
if
(
tEncodeI32
(
pEncoder
,
pReq
->
dstTaskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
srcNodeId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
srcTaskId
)
<
0
)
return
-
1
;
if
(
tEncodeBinary
(
pEncoder
,
(
const
uint8_t
*
)
&
pReq
->
pRetrieve
,
pReq
->
retrieveLen
)
<
0
)
return
-
1
;
if
(
tEncodeBinary
(
pEncoder
,
(
const
uint8_t
*
)
pReq
->
pRetrieve
,
pReq
->
retrieveLen
)
<
0
)
return
-
1
;
tEndEncode
(
pEncoder
);
return
pEncoder
->
pos
;
}
int32_t
tDecodeStreamRetrieveReq
(
SDecoder
*
pDecoder
,
SStreamRetrieveReq
*
pReq
)
{
int32_t
tlen
=
0
;
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pReq
->
streamId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
dstNodeId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
dstTaskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
srcNodeId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
srcTaskId
)
<
0
)
return
-
1
;
if
(
tDecodeBinary
(
pDecoder
,
(
uint8_t
**
)
&
pReq
->
pRetrieve
,
&
pReq
->
retrieveLen
)
<
0
)
return
-
1
;
uint64_t
len
=
0
;
if
(
tDecodeBinaryAlloc
(
pDecoder
,
(
void
**
)
&
pReq
->
pRetrieve
,
&
len
)
<
0
)
return
-
1
;
pReq
->
retrieveLen
=
len
;
tEndDecode
(
pDecoder
);
return
0
;
}
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
f53521cb
...
...
@@ -23,7 +23,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
void
*
pIter
=
NULL
;
while
(
1
)
{
taosHashIterate
(
pWal
->
pRefHash
,
pIter
);
pIter
=
taosHashIterate
(
pWal
->
pRefHash
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SWalRef
*
pRef
=
(
SWalRef
*
)
pIter
;
if
(
pRef
->
ver
!=
-
1
)
{
...
...
@@ -309,7 +309,7 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
SWalIdxEntry
entry
=
{.
ver
=
ver
,
.
offset
=
offset
};
/*int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_CUR);*/
/*wDebug("write index: ver: %ld, offset: %ld, at %ld", ver, offset, idxOffset);*/
int
size
=
taosWriteFile
(
pWal
->
pWriteIdxTFile
,
&
entry
,
sizeof
(
SWalIdxEntry
));
int
64_t
size
=
taosWriteFile
(
pWal
->
pWriteIdxTFile
,
&
entry
,
sizeof
(
SWalIdxEntry
));
if
(
size
!=
sizeof
(
SWalIdxEntry
))
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
// TODO truncate
...
...
tests/script/jenkins/basic.txt
浏览文件 @
f53521cb
...
...
@@ -90,6 +90,7 @@
./test.sh -f tsim/stream/triggerInterval0.sim
# ./test.sh -f tsim/stream/triggerSession0.sim
./test.sh -f tsim/stream/partitionby.sim
./test.sh -f tsim/stream/partitionby1.sim
./test.sh -f tsim/stream/schedSnode.sim
./test.sh -f tsim/stream/windowClose.sim
...
...
tests/script/tsim/stream/partitionby1.sim
0 → 100644
浏览文件 @
f53521cb
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create table ts3 using st tags(3,2,2);
sql create table ts4 using st tags(4,2,2);
sql create stream stream_t1 trigger at_once into streamtST1 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from st partition by tbname interval(10s);
sql insert into ts1 values(1648791213001,1,12,3,1.0);
sql insert into ts2 values(1648791213001,1,12,3,1.0);
sql insert into ts3 values(1648791213001,1,12,3,1.0);
sql insert into ts4 values(1648791213001,1,12,3,1.0);
$loop_count = 0
loop0:
sleep 300
sql select * from streamtST1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 4 then
print =====rows=$rows
goto loop0
endi
print =====loop0
sql create database test1 vgroups 1;
sql use test1;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,2,3);
sql create table ts2 using st tags(1,3,4);
sql create table ts3 using st tags(1,4,5);
sql create stream streams1 trigger at_once into streamt as select _wstartts, count(*) c1, count(a) c2 from st partition by tbname interval(10s);
sql insert into ts1 values(1648791211000,1,2,3);
sql insert into ts2 values(1648791211000,1,2,3);
$loop_count = 0
loop1:
sleep 300
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop1
endi
print =====loop1
sql create database test2 vgroups 1;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int,id int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t2 trigger at_once into streamtST as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6, max(id) c7 from st partition by ta interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3,1);
sql insert into ts1 values(1648791222001,2,2,3,2);
sql insert into ts2 values(1648791211000,1,2,3,3);
sql insert into ts2 values(1648791222001,2,2,3,4);
sql insert into ts2 values(1648791222002,2,2,3,5);
sql insert into ts2 values(1648791222002,2,2,3,6);
sql insert into ts1 values(1648791211000,1,2,3,1);
sql insert into ts1 values(1648791222001,2,2,3,2);
sql insert into ts2 values(1648791211000,1,2,3,3);
sql insert into ts2 values(1648791222001,2,2,3,4);
$loop_count = 0
loop2:
sleep 300
sql select * from streamtST;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop2
endi
if $data02 != 1 then
print =====data02=$data02
goto loop2
endi
if $data03 != 1 then
print =====data03=$data03
goto loop2
endi
if $data04 != 2 then
print =====data04=$data04
goto loop2
endi
print =====loop2
system sh/stop_dnodes.sh
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录