Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
6e5a6e70
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
6e5a6e70
编写于
7月 06, 2022
作者:
L
liuyao
提交者:
GitHub
7月 06, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14620 from taosdata/feature/TD-17132
feat(stream): session delete
上级
c1667cdd
888f79d3
变更
4
展开全部
隐藏空白更改
内联
并排
Showing
4 changed file
with
349 addition
and
77 deletion
+349
-77
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+6
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+133
-20
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+161
-56
tests/script/tsim/stream/state0.sim
tests/script/tsim/stream/state0.sim
+49
-0
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
6e5a6e70
...
...
@@ -319,8 +319,9 @@ typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE
=
1
,
STREAM_SCAN_FROM_RES
,
STREAM_SCAN_FROM_UPDATERES
,
STREAM_SCAN_FROM_DATAREADER
,
STREAM_SCAN_FROM_DATAREADER
,
// todo(liuyao) delete it
STREAM_SCAN_FROM_DATAREADER_RETRIEVE
,
STREAM_SCAN_FROM_DATAREADER_RANGE
,
}
EStreamScanMode
;
typedef
struct
SCatchSupporter
{
...
...
@@ -612,6 +613,7 @@ typedef struct SStreamSessionAggOperatorInfo {
SSDataBlock
*
pWinBlock
;
// window result
SqlFunctionCtx
*
pDummyCtx
;
// for combine
SSDataBlock
*
pDelRes
;
// delete result
bool
returnDelete
;
SSDataBlock
*
pUpdateRes
;
// update window
SHashObj
*
pStDeleted
;
void
*
pDelIterator
;
...
...
@@ -889,6 +891,9 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
SResultRow
*
getNewResultRow
(
SDiskbasedBuf
*
pResultBuf
,
int64_t
tableGroupId
,
int32_t
interBufSize
);
SResultWindowInfo
*
getSessionTimeWindow
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
startTs
,
TSKEY
endTs
,
uint64_t
groupId
,
int64_t
gap
,
int32_t
*
pIndex
);
SResultWindowInfo
*
getCurSessionWindow
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
startTs
,
TSKEY
endTs
,
uint64_t
groupId
,
int64_t
gap
,
int32_t
*
pIndex
);
bool
isInTimeWindow
(
STimeWindow
*
pWin
,
TSKEY
ts
,
int64_t
gap
);
int32_t
updateSessionWindowInfo
(
SResultWindowInfo
*
pWinInfo
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
int32_t
rows
,
int32_t
start
,
int64_t
gap
,
SHashObj
*
pStDeleted
);
bool
functionNeedToExecute
(
SqlFunctionCtx
*
pCtx
);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
6e5a6e70
...
...
@@ -792,13 +792,20 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
}
static
bool
isSessionWindow
(
SStreamBlockScanInfo
*
pInfo
)
{
return
pInfo
->
sessionSup
.
parentType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION
;
return
pInfo
->
sessionSup
.
parentType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION
||
pInfo
->
sessionSup
.
parentType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION
;
}
static
bool
isStateWindow
(
SStreamBlockScanInfo
*
pInfo
)
{
return
pInfo
->
sessionSup
.
parentType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE
;
}
static
bool
isIntervalWindow
(
SStreamBlockScanInfo
*
pInfo
)
{
return
pInfo
->
sessionSup
.
parentType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
||
pInfo
->
sessionSup
.
parentType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
;
}
static
uint64_t
getGroupId
(
SOperatorInfo
*
pOperator
,
uint64_t
uid
)
{
uint64_t
*
groupId
=
taosHashGet
(
pOperator
->
pTaskInfo
->
tableqinfoList
.
map
,
&
uid
,
sizeof
(
int64_t
));
if
(
groupId
)
{
...
...
@@ -836,6 +843,49 @@ static void setGroupId(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t
}
}
void
resetTableScanInfo
(
STableScanInfo
*
pTableScanInfo
,
STimeWindow
*
pWin
)
{
pTableScanInfo
->
cond
.
twindows
[
0
]
=
*
pWin
;
pTableScanInfo
->
curTWinIdx
=
0
;
// tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
// if (!pTableScanInfo->dataReader) {
// return false;
// }
pTableScanInfo
->
scanTimes
=
0
;
pTableScanInfo
->
currentGroupId
=
-
1
;
}
static
bool
prepareRangeScan
(
SStreamBlockScanInfo
*
pInfo
,
SSDataBlock
*
pBlock
,
int32_t
*
pRowIndex
)
{
if
((
*
pRowIndex
)
==
pBlock
->
info
.
rows
)
{
return
false
;
}
ASSERT
(
taosArrayGetSize
(
pBlock
->
pDataBlock
)
>=
3
);
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
TSKEY
*
startData
=
(
TSKEY
*
)
pStartTsCol
->
pData
;
SColumnInfoData
*
pEndTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
TSKEY
*
endData
=
(
TSKEY
*
)
pEndTsCol
->
pData
;
STimeWindow
win
=
{.
skey
=
startData
[
*
pRowIndex
],
.
ekey
=
endData
[
*
pRowIndex
]};
setGroupId
(
pInfo
,
pBlock
,
GROUPID_COLUMN_INDEX
,
*
pRowIndex
);
(
*
pRowIndex
)
++
;
for
(;
*
pRowIndex
<
pBlock
->
info
.
rows
;
(
*
pRowIndex
)
++
)
{
if
(
win
.
skey
==
startData
[
*
pRowIndex
])
{
win
.
ekey
=
TMAX
(
win
.
ekey
,
endData
[
*
pRowIndex
]);
continue
;
}
if
(
win
.
skey
==
endData
[
*
pRowIndex
])
{
win
.
skey
=
TMIN
(
win
.
skey
,
startData
[
*
pRowIndex
]);
continue
;
}
ASSERT
(
(
win
.
skey
>
startData
[
*
pRowIndex
]
&&
win
.
ekey
<
endData
[
*
pRowIndex
])
||
(
isInTimeWindow
(
&
win
,
startData
[
*
pRowIndex
],
0
)
||
isInTimeWindow
(
&
win
,
endData
[
*
pRowIndex
],
0
)
)
);
break
;
}
resetTableScanInfo
(
pInfo
->
pSnapshotReadOp
->
info
,
&
win
);
return
true
;
}
static
bool
prepareDataScan
(
SStreamBlockScanInfo
*
pInfo
,
SSDataBlock
*
pSDB
,
int32_t
tsColIndex
,
int32_t
*
pRowIndex
)
{
STimeWindow
win
=
{
.
skey
=
INT64_MIN
,
...
...
@@ -854,6 +904,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
SResultWindowInfo
*
pCurWin
=
getSessionTimeWindow
(
pAggSup
,
tsCols
[
*
pRowIndex
],
INT64_MIN
,
pSDB
->
info
.
groupId
,
gap
,
&
winIndex
);
win
=
pCurWin
->
win
;
setGroupId
(
pInfo
,
pSDB
,
GROUPID_COLUMN_INDEX
,
*
pRowIndex
);
(
*
pRowIndex
)
+=
updateSessionWindowInfo
(
pCurWin
,
tsCols
,
NULL
,
pSDB
->
info
.
rows
,
*
pRowIndex
,
gap
,
NULL
);
}
else
{
win
=
...
...
@@ -878,15 +929,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
if
(
!
needRead
)
{
return
false
;
}
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pSnapshotReadOp
->
info
;
pTableScanInfo
->
cond
.
twindows
[
0
]
=
win
;
pTableScanInfo
->
curTWinIdx
=
0
;
// tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
// if (!pTableScanInfo->dataReader) {
// return false;
// }
pTableScanInfo
->
scanTimes
=
0
;
pTableScanInfo
->
currentGroupId
=
-
1
;
resetTableScanInfo
(
pInfo
->
pSnapshotReadOp
->
info
,
&
win
);
return
true
;
}
...
...
@@ -903,6 +946,26 @@ static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRow
dest
->
info
.
rows
++
;
}
static
SSDataBlock
*
doRangeScan
(
SStreamBlockScanInfo
*
pInfo
,
SSDataBlock
*
pSDB
,
int32_t
tsColIndex
,
int32_t
*
pRowIndex
)
{
while
(
1
)
{
SSDataBlock
*
pResult
=
NULL
;
pResult
=
doTableScan
(
pInfo
->
pSnapshotReadOp
);
if
(
!
pResult
&&
prepareRangeScan
(
pInfo
,
pSDB
,
pRowIndex
))
{
// scan next window data
pResult
=
doTableScan
(
pInfo
->
pSnapshotReadOp
);
}
if
(
!
pResult
)
{
blockDataCleanup
(
pSDB
);
*
pRowIndex
=
0
;
return
NULL
;
}
if
(
pResult
->
info
.
groupId
==
pInfo
->
groupId
)
{
return
pResult
;
}
}
}
static
SSDataBlock
*
doDataScan
(
SStreamBlockScanInfo
*
pInfo
,
SSDataBlock
*
pSDB
,
int32_t
tsColIndex
,
int32_t
*
pRowIndex
)
{
while
(
1
)
{
SSDataBlock
*
pResult
=
NULL
;
...
...
@@ -935,7 +998,7 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, i
*/
}
static
void
copyDeleteDataBlock
(
SStreamBlockScanInfo
*
pInfo
,
SSDataBlock
*
pDelBlock
,
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pUpdateRes
)
{
static
void
generateIntervalTs
(
SStreamBlockScanInfo
*
pInfo
,
SSDataBlock
*
pDelBlock
,
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pUpdateRes
)
{
if
(
pDelBlock
->
info
.
rows
==
0
)
{
return
;
}
...
...
@@ -950,7 +1013,7 @@ static void copyDeleteDataBlock(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBl
uint64_t
*
uidCol
=
(
uint64_t
*
)
pGpCol
->
pData
;
SColumnInfoData
*
pDestTsCol
=
taosArrayGet
(
pUpdateRes
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDestGpCol
=
taosArrayGet
(
pUpdateRes
->
pDataBlock
,
DELETE_
GROUPID_COLUMN_INDEX
);
SColumnInfoData
*
pDestGpCol
=
taosArrayGet
(
pUpdateRes
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
for
(
int32_t
i
=
pInfo
->
deleteDataIndex
;
i
<
pDelBlock
->
info
.
rows
&&
i
<
pDelBlock
->
info
.
capacity
-
(
endData
[
i
]
-
startData
[
i
])
/
pInfo
->
interval
.
interval
-
1
;
i
++
)
{
uint64_t
groupId
=
getGroupId
(
pOperator
,
uidCol
[
i
]);
...
...
@@ -969,6 +1032,40 @@ static void copyDeleteDataBlock(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBl
}
}
static
void
generateScanRange
(
SStreamBlockScanInfo
*
pInfo
,
SSDataBlock
*
pBlock
,
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pUpdateRes
)
{
if
(
pBlock
->
info
.
rows
==
0
)
{
return
;
}
blockDataCleanup
(
pUpdateRes
);
blockDataEnsureCapacity
(
pUpdateRes
,
pBlock
->
info
.
rows
);
ASSERT
(
taosArrayGetSize
(
pBlock
->
pDataBlock
)
>=
3
);
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
TSKEY
*
startData
=
(
TSKEY
*
)
pStartTsCol
->
pData
;
SColumnInfoData
*
pEndTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
TSKEY
*
endData
=
(
TSKEY
*
)
pEndTsCol
->
pData
;
SColumnInfoData
*
pGpCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
uint64_t
*
uidCol
=
(
uint64_t
*
)
pGpCol
->
pData
;
SColumnInfoData
*
pDestStartCol
=
taosArrayGet
(
pUpdateRes
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDestEndCol
=
taosArrayGet
(
pUpdateRes
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDestGpCol
=
taosArrayGet
(
pUpdateRes
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
int32_t
dummy
=
0
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
++
)
{
uint64_t
groupId
=
getGroupId
(
pOperator
,
uidCol
[
i
]);
//gap must be 0.
SResultWindowInfo
*
pStartWin
=
getCurSessionWindow
(
pInfo
->
sessionSup
.
pStreamAggSup
,
startData
[
i
],
endData
[
i
],
groupId
,
0
,
&
dummy
);
if
(
!
pStartWin
)
{
// window has been closed.
continue
;
}
SResultWindowInfo
*
pEndWin
=
getCurSessionWindow
(
pInfo
->
sessionSup
.
pStreamAggSup
,
endData
[
i
],
endData
[
i
],
groupId
,
0
,
&
dummy
);
ASSERT
(
pEndWin
);
colDataAppend
(
pDestStartCol
,
i
,
(
const
char
*
)
&
pStartWin
->
win
.
skey
,
false
);
colDataAppend
(
pDestEndCol
,
i
,
(
const
char
*
)
&
pEndWin
->
win
.
ekey
,
false
);
colDataAppend
(
pDestGpCol
,
i
,
(
const
char
*
)
&
groupId
,
false
);
pUpdateRes
->
info
.
rows
++
;
}
}
static
void
setUpdateData
(
SStreamBlockScanInfo
*
pInfo
,
SSDataBlock
*
pBlock
,
SSDataBlock
*
pUpdateBlock
)
{
blockDataCleanup
(
pUpdateBlock
);
int32_t
size
=
taosArrayGetSize
(
pInfo
->
tsArray
);
...
...
@@ -1001,7 +1098,7 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
}
if
(
size
==
0
)
{
copyDeleteDataBlock
(
pInfo
,
pInfo
->
pDeleteDataRes
,
pInfo
->
pSnapshotReadOp
,
pUpdateBlock
);
generateIntervalTs
(
pInfo
,
pInfo
->
pDeleteDataRes
,
pInfo
->
pSnapshotReadOp
,
pUpdateBlock
);
}
}
...
...
@@ -1061,11 +1158,17 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
break
;
case
STREAM_DELETE_DATA
:
{
pInfo
->
blockType
=
STREAM_INPUT__DATA_SUBMIT
;
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER
;
copyDataBlock
(
pInfo
->
pDeleteDataRes
,
pBlock
);
copyDeleteDataBlock
(
pInfo
,
pInfo
->
pDeleteDataRes
,
pInfo
->
pSnapshotReadOp
,
pInfo
->
pUpdateRes
);
pInfo
->
updateResIndex
=
0
;
prepareDataScan
(
pInfo
,
pInfo
->
pUpdateRes
,
START_TS_COLUMN_INDEX
,
&
pInfo
->
updateResIndex
);
if
(
isIntervalWindow
(
pInfo
))
{
copyDataBlock
(
pInfo
->
pDeleteDataRes
,
pBlock
);
generateIntervalTs
(
pInfo
,
pInfo
->
pDeleteDataRes
,
pInfo
->
pSnapshotReadOp
,
pInfo
->
pUpdateRes
);
prepareDataScan
(
pInfo
,
pInfo
->
pUpdateRes
,
START_TS_COLUMN_INDEX
,
&
pInfo
->
updateResIndex
);
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER
;
}
else
{
generateScanRange
(
pInfo
,
pBlock
,
pInfo
->
pSnapshotReadOp
,
pInfo
->
pUpdateRes
);
prepareRangeScan
(
pInfo
,
pInfo
->
pUpdateRes
,
&
pInfo
->
updateResIndex
);
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER_RANGE
;
}
pInfo
->
pUpdateRes
->
info
.
type
=
STREAM_DELETE_DATA
;
return
pInfo
->
pUpdateRes
;
}
...
...
@@ -1080,8 +1183,10 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
return
pInfo
->
pRes
;
}
else
if
(
pInfo
->
scanMode
==
STREAM_SCAN_FROM_UPDATERES
)
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER
;
if
(
!
isStateWindow
(
pInfo
))
{
if
(
isStateWindow
(
pInfo
))
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
}
else
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER
;
prepareDataScan
(
pInfo
,
pInfo
->
pUpdateRes
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
updateResIndex
);
}
return
pInfo
->
pUpdateRes
;
...
...
@@ -1106,11 +1211,19 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
return
pInfo
->
pUpdateRes
;
}
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
}
else
if
(
pInfo
->
scanMode
==
STREAM_SCAN_FROM_DATAREADER_RANGE
)
{
SSDataBlock
*
pSDB
=
doRangeScan
(
pInfo
,
pInfo
->
pUpdateRes
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
updateResIndex
);
if
(
pSDB
)
{
pSDB
->
info
.
type
=
STREAM_NORMAL
;
checkUpdateData
(
pInfo
,
true
,
pSDB
,
false
);
return
pSDB
;
}
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
}
else
if
(
isStateWindow
(
pInfo
))
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER
;
pInfo
->
updateResIndex
=
pInfo
->
pUpdateRes
->
info
.
rows
;
if
(
prepareDataScan
(
pInfo
,
pInfo
->
pUpdateRes
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
updateResIndex
))
{
ASSERT
(
pInfo
->
pUpdateRes
->
info
.
rows
==
0
);
blockDataCleanup
(
pInfo
->
pUpdateRes
);
// return empty data blcok
return
pInfo
->
pUpdateRes
;
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
6e5a6e70
此差异已折叠。
点击以展开。
tests/script/tsim/stream/state0.sim
浏览文件 @
6e5a6e70
...
...
@@ -449,4 +449,53 @@ if $data26 != 14 then
return -1
endi
sql create database test1 vgroups 1
sql show databases
print $data00 $data01 $data02
sql use test1
sql create table t1(ts timestamp, a int, b int , c int, d double, id int);
sql create stream streams2 trigger at_once into streamt1 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
sql insert into t1 values(1648791212000,2,2,3,1.0,1);
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
sql insert into t1 values(1648791213000,1,2,4,1.0,2);
$loop_count = 0
loop5:
sleep 300
sql select * from streamt1 order by c desc;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop5
endi
if $data01 != 1 then
print =====data01=$data01
goto loop5
endi
if $data05 != 4 then
print =====data05=$data05
goto loop5
endi
if $data11 != 1 then
print =====data11=$data11
goto loop5
endi
if $data15 != 3 then
print =====data15=$data15
goto loop5
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录