Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
497c27d6
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
497c27d6
编写于
6月 04, 2022
作者:
L
liuyao
提交者:
GitHub
6月 04, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13468 from taosdata/feature/TD-16116
feat(stream): stream state operator
上级
7fa2d3d0
54005072
变更
16
显示空白变更内容
内联
并排
Showing
16 changed file
with
1112 addition
and
86 deletion
+1112
-86
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+1
-0
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+2
-0
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+32
-3
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+15
-2
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+71
-33
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+449
-27
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+3
-3
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+30
-1
source/libs/nodes/src/nodesTraverseFuncs.c
source/libs/nodes/src/nodesTraverseFuncs.c
+2
-1
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+2
-0
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+3
-3
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+3
-1
tests/script/tsim/stream/session0.sim
tests/script/tsim/stream/session0.sim
+11
-11
tests/script/tsim/stream/session1.sim
tests/script/tsim/stream/session1.sim
+10
-0
tests/script/tsim/stream/state0.sim
tests/script/tsim/stream/state0.sim
+477
-0
tests/script/tsim/stream/triggerSession0.sim
tests/script/tsim/stream/triggerSession0.sim
+1
-1
未找到文件。
include/libs/nodes/nodes.h
浏览文件 @
497c27d6
...
...
@@ -219,6 +219,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION_WINDOW
,
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW
,
QUERY_NODE_PHYSICAL_PLAN_PARTITION
,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
,
QUERY_NODE_PHYSICAL_PLAN_INSERT
,
...
...
include/libs/nodes/plannodes.h
浏览文件 @
497c27d6
...
...
@@ -338,6 +338,8 @@ typedef struct SStateWinodwPhysiNode {
SNode
*
pStateKey
;
}
SStateWinodwPhysiNode
;
typedef
SStateWinodwPhysiNode
SStreamStateWinodwPhysiNode
;
typedef
struct
SSortPhysiNode
{
SPhysiNode
node
;
SNodeList
*
pExprs
;
// these are expression list of order_by_clause and parameter expression of aggregate function
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
497c27d6
...
...
@@ -366,16 +366,18 @@ typedef struct SCatchSupporter {
}
SCatchSupporter
;
typedef
struct
SStreamAggSupporter
{
SArray
*
pResultRows
;
// SResultWindowInfo
SArray
*
pResultRows
;
int32_t
keySize
;
char
*
pKeyBuf
;
// window key buffer
SDiskbasedBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
int32_t
resultRowSize
;
// the result buffer size for each result row, with the meta data size for each row
SArray
*
pScanWindow
;
}
SStreamAggSupporter
;
typedef
struct
SessionWindowSupporter
{
SStreamAggSupporter
*
pStreamAggSup
;
int64_t
gap
;
uint8_t
parentType
;
}
SessionWindowSupporter
;
typedef
struct
SStreamBlockScanInfo
{
...
...
@@ -406,6 +408,7 @@ typedef struct SStreamBlockScanInfo {
SArray
*
childIds
;
SessionWindowSupporter
sessionSup
;
bool
assignBlockUid
;
// assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t
scanWinIndex
;
}
SStreamBlockScanInfo
;
typedef
struct
SSysTableScanInfo
{
...
...
@@ -593,6 +596,11 @@ typedef struct SResultWindowInfo {
bool
isClosed
;
}
SResultWindowInfo
;
typedef
struct
SStateWindowInfo
{
SResultWindowInfo
winInfo
;
SStateKeys
stateKey
;
}
SStateWindowInfo
;
typedef
struct
SStreamSessionAggOperatorInfo
{
SOptrBasicInfo
binfo
;
SStreamAggSupporter
streamAggSup
;
...
...
@@ -606,7 +614,7 @@ typedef struct SStreamSessionAggOperatorInfo {
SSDataBlock
*
pDelRes
;
SHashObj
*
pStDeleted
;
void
*
pDelIterator
;
SArray
*
pChildren
;
// cache for children's result;
SArray
*
pChildren
;
// cache for children's result;
final stream operator
}
SStreamSessionAggOperatorInfo
;
typedef
struct
STimeSliceOperatorInfo
{
...
...
@@ -630,6 +638,22 @@ typedef struct SStateWindowOperatorInfo {
// bool reptScan;
}
SStateWindowOperatorInfo
;
typedef
struct
SStreamStateAggOperatorInfo
{
SOptrBasicInfo
binfo
;
SStreamAggSupporter
streamAggSup
;
SGroupResInfo
groupResInfo
;
int32_t
primaryTsIndex
;
// primary timestamp slot id
int32_t
order
;
// current SSDataBlock scan order
STimeWindowAggSupp
twAggSup
;
SColumn
stateCol
;
// start row index
SqlFunctionCtx
*
pDummyCtx
;
// for combine
SSDataBlock
*
pDelRes
;
SHashObj
*
pSeDeleted
;
void
*
pDelIterator
;
SArray
*
pScanWindow
;
SArray
*
pChildren
;
// cache for children's result;
}
SStreamStateAggOperatorInfo
;
typedef
struct
SSortedMergeOperatorInfo
{
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo
binfo
;
...
...
@@ -787,6 +811,10 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pE
SOperatorInfo
*
createStreamSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
int64_t
gap
,
int32_t
tsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamStateAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
#endif
...
...
@@ -838,7 +866,8 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary
int32_t
startPos
,
TSKEY
ekey
,
__block_search_fn_t
searchFn
,
STableQueryInfo
*
item
,
int32_t
order
);
int32_t
binarySearchForKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
);
int32_t
initStreamAggSupporter
(
SStreamAggSupporter
*
pSup
,
const
char
*
pKey
);
int32_t
initSessionAggSupporter
(
SStreamAggSupporter
*
pSup
,
const
char
*
pKey
);
int32_t
initStateAggSupporter
(
SStreamAggSupporter
*
pSup
,
const
char
*
pKey
);
SResultRow
*
getNewResultRow
(
SDiskbasedBuf
*
pResultBuf
,
int64_t
tableGroupId
,
int32_t
interBufSize
);
SResultWindowInfo
*
getSessionTimeWindow
(
SArray
*
pWinInfos
,
TSKEY
ts
,
int64_t
gap
,
int32_t
*
pIndex
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
497c27d6
...
...
@@ -4600,6 +4600,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SColumnNode
*
pColNode
=
(
SColumnNode
*
)((
STargetNode
*
)
pStateNode
->
pStateKey
)
->
pExpr
;
SColumn
col
=
extractColumnFromColumnNode
(
pColNode
);
pOptr
=
createStatewindowOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
as
,
tsSlotId
,
&
col
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW
==
type
)
{
pOptr
=
createStreamStateAggOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_JOIN
==
type
)
{
SJoinPhysiNode
*
pJoinNode
=
(
SJoinPhysiNode
*
)
pPhyNode
;
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
...
...
@@ -5185,14 +5187,17 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
return
TSDB_CODE_SUCCESS
;
}
int32_t
initStreamAggSupporter
(
SStreamAggSupporter
*
pSup
,
const
char
*
pKey
)
{
int32_t
initStreamAggSupporter
(
SStreamAggSupporter
*
pSup
,
const
char
*
pKey
,
size_t
size
)
{
pSup
->
keySize
=
sizeof
(
int64_t
)
+
sizeof
(
TSKEY
);
pSup
->
pKeyBuf
=
taosMemoryCalloc
(
1
,
pSup
->
keySize
);
pSup
->
pResultRows
=
taosArrayInit
(
1024
,
size
of
(
SResultWindowInfo
)
);
pSup
->
pResultRows
=
taosArrayInit
(
1024
,
size
);
if
(
pSup
->
pKeyBuf
==
NULL
||
pSup
->
pResultRows
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pSup
->
pScanWindow
=
taosArrayInit
(
4
,
sizeof
(
STimeWindow
));
int32_t
pageSize
=
4096
;
while
(
pageSize
<
pSup
->
resultRowSize
*
4
)
{
pageSize
<<=
1u
;
...
...
@@ -5205,6 +5210,14 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey) {
return
createDiskbasedBuf
(
&
pSup
->
pResultBuf
,
pageSize
,
bufSize
,
pKey
,
TD_TMP_DIR_PATH
);
}
int32_t
initSessionAggSupporter
(
SStreamAggSupporter
*
pSup
,
const
char
*
pKey
)
{
return
initStreamAggSupporter
(
pSup
,
pKey
,
sizeof
(
SResultWindowInfo
));
}
int32_t
initStateAggSupporter
(
SStreamAggSupporter
*
pSup
,
const
char
*
pKey
)
{
return
initStreamAggSupporter
(
pSup
,
pKey
,
sizeof
(
SStateWindowInfo
));
}
int64_t
getSmaWaterMark
(
int64_t
interval
,
double
filesFactor
)
{
int64_t
waterMark
=
0
;
ASSERT
(
FLT_GREATEREQUAL
(
filesFactor
,
0
.
000000
));
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
497c27d6
...
...
@@ -706,16 +706,23 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
taosArrayClear
(
pInfo
->
pBlockLists
);
}
static
bool
isSessionWindow
(
SStreamBlockScanInfo
*
pInfo
)
{
return
pInfo
->
sessionSup
.
pStreamAggSup
!=
NULL
;
}
static
bool
isSessionWindow
(
SStreamBlockScanInfo
*
pInfo
)
{
return
pInfo
->
sessionSup
.
parentType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW
;
}
static
bool
isStateWindow
(
SStreamBlockScanInfo
*
pInfo
)
{
return
pInfo
->
sessionSup
.
parentType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW
;
}
static
bool
prepareDataScan
(
SStreamBlockScanInfo
*
pInfo
)
{
SSDataBlock
*
pSDB
=
pInfo
->
pUpdateRes
;
if
(
pInfo
->
updateResIndex
<
pSDB
->
info
.
rows
)
{
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pSDB
->
pDataBlock
,
0
);
STimeWindow
win
=
{.
skey
=
INT64_MIN
,
.
ekey
=
INT64_MAX
,};
bool
needRead
=
false
;
if
(
!
isStateWindow
(
pInfo
)
&&
pInfo
->
updateResIndex
<
pSDB
->
info
.
rows
)
{
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pSDB
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
TSKEY
*
tsCols
=
(
TSKEY
*
)
pColDataInfo
->
pData
;
SResultRowInfo
dumyInfo
;
dumyInfo
.
cur
.
pageId
=
-
1
;
STimeWindow
win
;
if
(
isSessionWindow
(
pInfo
))
{
SStreamAggSupporter
*
pAggSup
=
pInfo
->
sessionSup
.
pStreamAggSup
;
int64_t
gap
=
pInfo
->
sessionSup
.
gap
;
...
...
@@ -731,15 +738,28 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
pInfo
->
updateResIndex
+=
getNumOfRowsInTimeWindow
(
&
pSDB
->
info
,
tsCols
,
pInfo
->
updateResIndex
,
win
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
}
needRead
=
true
;
}
else
if
(
isStateWindow
(
pInfo
))
{
SArray
*
pWins
=
pInfo
->
sessionSup
.
pStreamAggSup
->
pScanWindow
;
int32_t
size
=
taosArrayGetSize
(
pWins
);
if
(
pInfo
->
scanWinIndex
<
size
)
{
win
=
*
(
STimeWindow
*
)
taosArrayGet
(
pWins
,
pInfo
->
scanWinIndex
);
pInfo
->
scanWinIndex
++
;
needRead
=
true
;
}
else
{
pInfo
->
scanWinIndex
=
0
;
taosArrayClear
(
pWins
);
}
}
if
(
!
needRead
)
{
return
false
;
}
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pOperatorDumy
->
info
;
pTableScanInfo
->
cond
.
twindows
[
0
]
=
win
;
pTableScanInfo
->
curTWinIdx
=
0
;
tsdbResetReadHandle
(
pTableScanInfo
->
dataReader
,
&
pTableScanInfo
->
cond
,
0
);
pTableScanInfo
->
scanTimes
=
0
;
return
true
;
}
else
{
return
false
;
}
}
static
SSDataBlock
*
doDataScan
(
SStreamBlockScanInfo
*
pInfo
)
{
...
...
@@ -754,36 +774,39 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
return
pResult
;
}
static
SSDataBlock
*
getUpdateDataBlock
(
SStreamBlockScanInfo
*
pInfo
,
bool
invertible
)
{
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pInfo
->
pRes
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
static
void
getUpdateDataBlock
(
SStreamBlockScanInfo
*
pInfo
,
bool
invertible
,
SSDataBlock
*
pBlock
,
SSDataBlock
*
pUpdateBlock
)
{
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
ASSERT
(
pColDataInfo
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
TSKEY
*
ts
=
(
TSKEY
*
)
pColDataInfo
->
pData
;
for
(
int32_t
i
=
0
;
i
<
p
Info
->
pRes
->
info
.
rows
;
i
++
)
{
if
(
updateInfoIsUpdated
(
pInfo
->
pUpdateInfo
,
p
Info
->
pRes
->
info
.
uid
,
ts
[
i
]))
{
for
(
int32_t
i
=
0
;
i
<
p
Block
->
info
.
rows
;
i
++
)
{
if
(
updateInfoIsUpdated
(
pInfo
->
pUpdateInfo
,
p
Block
->
info
.
uid
,
ts
[
i
]))
{
taosArrayPush
(
pInfo
->
tsArray
,
ts
+
i
);
}
}
if
(
!
pUpdateBlock
)
{
taosArrayClear
(
pInfo
->
tsArray
);
return
;
}
int32_t
size
=
taosArrayGetSize
(
pInfo
->
tsArray
);
if
(
size
>
0
&&
invertible
)
{
// T
ODO
(liuyao) get from tsdb
// SSDataBlock* p = createOneDataBlock(p
Info->pRes
, true);
// T
odo
(liuyao) get from tsdb
// SSDataBlock* p = createOneDataBlock(p
Block
, true);
// p->info.type = STREAM_INVERT;
// taosArrayClear(pInfo->tsArray);
// return p;
SSDataBlock
*
pDataBlock
=
createOneDataBlock
(
pInfo
->
pRes
,
false
);
SColumnInfoData
*
pCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pDataBlock
->
pDataBlock
,
0
);
SColumnInfoData
*
pCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pUpdateBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
ASSERT
(
pCol
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
colInfoDataEnsureCapacity
(
pCol
,
0
,
size
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
TSKEY
*
pTs
=
(
TSKEY
*
)
taosArrayGet
(
pInfo
->
tsArray
,
i
);
colDataAppend
(
pCol
,
i
,
(
char
*
)
pTs
,
false
);
}
p
Data
Block
->
info
.
rows
=
size
;
p
Data
Block
->
info
.
type
=
STREAM_REPROCESS
;
blockDataUpdateTsWindow
(
p
Data
Block
,
0
);
p
Update
Block
->
info
.
rows
=
size
;
p
Update
Block
->
info
.
type
=
STREAM_REPROCESS
;
blockDataUpdateTsWindow
(
p
Update
Block
,
0
);
taosArrayClear
(
pInfo
->
tsArray
);
return
pDataBlock
;
}
return
NULL
;
}
static
SSDataBlock
*
doStreamBlockScan
(
SOperatorInfo
*
pOperator
)
{
...
...
@@ -815,16 +838,27 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
else
if
(
pInfo
->
scanMode
==
STREAM_SCAN_FROM_UPDATERES
)
{
blockDataCleanup
(
pInfo
->
pRes
);
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER
;
if
(
!
isStateWindow
(
pInfo
))
{
prepareDataScan
(
pInfo
);
}
return
pInfo
->
pUpdateRes
;
}
else
if
(
pInfo
->
scanMode
==
STREAM_SCAN_FROM_DATAREADER
)
{
}
else
{
if
(
isStateWindow
(
pInfo
)
&&
taosArrayGetSize
(
pInfo
->
sessionSup
.
pStreamAggSup
->
pScanWindow
)
>
0
)
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER
;
pInfo
->
updateResIndex
=
pInfo
->
pUpdateRes
->
info
.
rows
;
prepareDataScan
(
pInfo
);
}
if
(
pInfo
->
scanMode
==
STREAM_SCAN_FROM_DATAREADER
)
{
SSDataBlock
*
pSDB
=
doDataScan
(
pInfo
);
if
(
pSDB
==
NULL
)
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
}
else
{
getUpdateDataBlock
(
pInfo
,
true
,
pSDB
,
NULL
);
return
pSDB
;
}
}
}
SDataBlockInfo
*
pBlockInfo
=
&
pInfo
->
pRes
->
info
;
blockDataCleanup
(
pInfo
->
pRes
);
...
...
@@ -906,7 +940,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
if
(
rows
==
0
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
else
if
(
pInfo
->
pUpdateInfo
)
{
SSDataBlock
*
upRes
=
getUpdateDataBlock
(
pInfo
,
true
);
SSDataBlock
*
upRes
=
createOneDataBlock
(
pInfo
->
pRes
,
false
);
getUpdateDataBlock
(
pInfo
,
true
,
pInfo
->
pRes
,
upRes
);
if
(
upRes
)
{
pInfo
->
pUpdateRes
=
upRes
;
if
(
upRes
->
info
.
type
==
STREAM_REPROCESS
)
{
...
...
@@ -950,6 +985,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
int16_t
colId
=
id
->
colId
;
taosArrayPush
(
pColIds
,
&
colId
);
if
(
id
->
colId
==
pTableScanNode
->
tsColId
)
{
pInfo
->
primaryTsIndex
=
id
->
targetSlotId
;
}
}
// set the extract column id to streamHandle
...
...
@@ -974,7 +1012,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
pTwSup
->
waterMark
=
getSmaWaterMark
(
pSTInfo
->
interval
.
interval
,
pTableScanNode
->
filesFactor
);
}
pInfo
->
primaryTsIndex
=
0
;
// pTableScanNode->tsColId;
if
(
pSTInfo
->
interval
.
interval
>
0
&&
pDataReader
)
{
pInfo
->
pUpdateInfo
=
updateInfoInitP
(
&
pSTInfo
->
interval
,
pTwSup
->
waterMark
);
}
else
{
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
497c27d6
...
...
@@ -10,7 +10,7 @@ typedef enum SResultTsInterpType {
}
SResultTsInterpType
;
static
SSDataBlock
*
doStreamFinalIntervalAgg
(
SOperatorInfo
*
pOperator
);
static
SSDataBlock
*
doStreamSession
Window
Agg
(
SOperatorInfo
*
pOperator
);
static
SSDataBlock
*
doStreamSessionAgg
(
SOperatorInfo
*
pOperator
);
static
int64_t
*
extractTsCol
(
SSDataBlock
*
pBlock
,
const
SIntervalAggOperatorInfo
*
pInfo
);
...
...
@@ -2097,12 +2097,13 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num
pDummy
[
i
].
functionId
=
pCtx
[
i
].
functionId
;
}
}
void
initDownStream
(
SOperatorInfo
*
downstream
,
SStreamSessionAggOperatorInfo
*
pInfo
)
{
void
initDownStream
(
SOperatorInfo
*
downstream
,
SStreamAggSupporter
*
pAggSup
,
int64_t
gap
,
int64_t
waterMark
,
uint8_t
type
)
{
ASSERT
(
downstream
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
);
SStreamBlockScanInfo
*
pScanInfo
=
downstream
->
info
;
pScanInfo
->
sessionSup
=
(
SessionWindowSupporter
){.
pStreamAggSup
=
&
pInfo
->
streamAggSup
,
.
gap
=
pInfo
->
gap
};
pScanInfo
->
pUpdateInfo
=
updateInfoInit
(
60000
,
TSDB_TIME_PRECISION_MILLI
,
60000
*
60
*
6
);
(
SessionWindowSupporter
){.
pStreamAggSup
=
pAggSup
,
.
gap
=
gap
,
.
parentType
=
type
};
pScanInfo
->
pUpdateInfo
=
updateInfoInit
(
60000
,
TSDB_TIME_PRECISION_MILLI
,
waterMark
);
}
SOperatorInfo
*
createStreamSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
...
...
@@ -2118,7 +2119,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
initResultSizeInfo
(
pOperator
,
4096
);
code
=
initS
tream
AggSupporter
(
&
pInfo
->
streamAggSup
,
"StreamSessionAggOperatorInfo"
);
code
=
initS
ession
AggSupporter
(
&
pInfo
->
streamAggSup
,
"StreamSessionAggOperatorInfo"
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -2158,11 +2159,12 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
numOfExprs
=
numOfCols
;
pOperator
->
info
=
pInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamSession
Window
Agg
,
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamSessionAgg
,
NULL
,
NULL
,
destroyStreamSessionAggOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
pOperator
->
pTaskInfo
=
pTaskInfo
;
initDownStream
(
downstream
,
pInfo
);
initDownStream
(
downstream
,
&
pInfo
->
streamAggSup
,
pInfo
->
gap
,
pInfo
->
twAggSup
.
waterMark
,
pOperator
->
operatorType
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
...
...
@@ -2208,6 +2210,7 @@ SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap
int32_t
*
pIndex
)
{
int32_t
size
=
taosArrayGetSize
(
pWinInfos
);
if
(
size
==
0
)
{
*
pIndex
=
0
;
return
addNewSessionWindow
(
pWinInfos
,
ts
);
}
// find the first position which is smaller than the key
...
...
@@ -2234,8 +2237,8 @@ SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap
*
pIndex
=
taosArrayGetSize
(
pWinInfos
);
return
addNewSessionWindow
(
pWinInfos
,
ts
);
}
*
pIndex
=
index
;
return
insertNewSessionWindow
(
pWinInfos
,
ts
,
index
);
*
pIndex
=
index
+
1
;
return
insertNewSessionWindow
(
pWinInfos
,
ts
,
index
+
1
);
}
int32_t
updateSessionWindowInfo
(
SResultWindowInfo
*
pWinInfo
,
TSKEY
*
pTs
,
int32_t
rows
,
...
...
@@ -2290,24 +2293,41 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
doOneWindowAgg
(
SStreamSessionAggOperatorInfo
*
pInfo
,
static
int32_t
doOneWindowAggImpl
(
int32_t
tsColId
,
SOptrBasicInfo
*
pBinfo
,
SStreamAggSupporter
*
pAggSup
,
SColumnInfoData
*
pTimeWindowData
,
SSDataBlock
*
pSDataBlock
,
SResultWindowInfo
*
pCurWin
,
SResultRow
**
pResult
,
int32_t
startIndex
,
int32_t
winRows
,
int32_t
numOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
int32_t
startIndex
,
int32_t
winRows
,
int32_t
numOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
tsColId
);
TSKEY
*
tsCols
=
(
int64_t
*
)
pColDataInfo
->
pData
;
int32_t
code
=
setWindowOutputBuf
(
pCurWin
,
pResult
,
pInfo
->
binfo
.
pCtx
,
pSDataBlock
->
info
.
groupId
,
numOutput
,
pInfo
->
binfo
.
rowCellInfoOffset
,
&
pInfo
->
streamAggSup
,
pTaskInfo
);
int32_t
code
=
setWindowOutputBuf
(
pCurWin
,
pResult
,
pBinfo
->
pCtx
,
pSDataBlock
->
info
.
groupId
,
numOutput
,
pBinfo
->
rowCellInfoOffset
,
pAggSup
,
pTaskInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
(
*
pResult
)
==
NULL
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pCurWin
->
win
,
true
);
doApplyFunctions
(
pTaskInfo
,
pInfo
->
binfo
.
pCtx
,
&
pCurWin
->
win
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startIndex
,
winRows
,
tsCols
,
pSDataBlock
->
info
.
rows
,
numOutput
,
TSDB_ORDER_ASC
);
updateTimeWindowInfo
(
pTimeWindowData
,
&
pCurWin
->
win
,
true
);
doApplyFunctions
(
pTaskInfo
,
pBinfo
->
pCtx
,
&
pCurWin
->
win
,
pTimeWindowData
,
startIndex
,
winRows
,
tsCols
,
pSDataBlock
->
info
.
rows
,
numOutput
,
TSDB_ORDER_ASC
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
doOneWindowAgg
(
SStreamSessionAggOperatorInfo
*
pInfo
,
SSDataBlock
*
pSDataBlock
,
SResultWindowInfo
*
pCurWin
,
SResultRow
**
pResult
,
int32_t
startIndex
,
int32_t
winRows
,
int32_t
numOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
return
doOneWindowAggImpl
(
pInfo
->
primaryTsIndex
,
&
pInfo
->
binfo
,
&
pInfo
->
streamAggSup
,
&
pInfo
->
twAggSup
.
timeWindowData
,
pSDataBlock
,
pCurWin
,
pResult
,
startIndex
,
winRows
,
numOutput
,
pTaskInfo
);
}
static
int32_t
doOneStateWindowAgg
(
SStreamStateAggOperatorInfo
*
pInfo
,
SSDataBlock
*
pSDataBlock
,
SResultWindowInfo
*
pCurWin
,
SResultRow
**
pResult
,
int32_t
startIndex
,
int32_t
winRows
,
int32_t
numOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
return
doOneWindowAggImpl
(
pInfo
->
primaryTsIndex
,
&
pInfo
->
binfo
,
&
pInfo
->
streamAggSup
,
&
pInfo
->
twAggSup
.
timeWindowData
,
pSDataBlock
,
pCurWin
,
pResult
,
startIndex
,
winRows
,
numOutput
,
pTaskInfo
);
}
int32_t
copyWinInfoToDataBlock
(
SSDataBlock
*
pBlock
,
SStreamAggSupporter
*
pAggSup
,
int32_t
start
,
int32_t
num
,
int32_t
numOfExprs
,
SOptrBasicInfo
*
pBinfo
)
{
for
(
int32_t
i
=
start
;
i
<
num
;
i
+=
1
)
{
...
...
@@ -2366,7 +2386,7 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
}
}
static
void
doStreamSession
Window
AggImpl
(
SOperatorInfo
*
pOperator
,
static
void
doStreamSessionAggImpl
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pSDataBlock
,
SHashObj
*
pStUpdated
,
SHashObj
*
pStDeleted
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SStreamSessionAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
...
...
@@ -2395,8 +2415,8 @@ static void doStreamSessionWindowAggImpl(SOperatorInfo* pOperator,
int32_t
winIndex
=
0
;
SResultWindowInfo
*
pCurWin
=
getSessionTimeWindow
(
pAggSup
->
pResultRows
,
tsCols
[
i
],
gap
,
&
winIndex
);
winRows
=
updateSessionWindowInfo
(
pCurWin
,
tsCols
,
pSDataBlock
->
info
.
rows
,
i
,
pInfo
->
gap
,
pStDeleted
);
winRows
=
updateSessionWindowInfo
(
pCurWin
,
tsCols
,
pSDataBlock
->
info
.
rows
,
i
,
pInfo
->
gap
,
pStDeleted
);
code
=
doOneWindowAgg
(
pInfo
,
pSDataBlock
,
pCurWin
,
&
pResult
,
i
,
winRows
,
numOfOutput
,
pTaskInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
...
@@ -2512,12 +2532,21 @@ bool isFinalSession(SStreamSessionAggOperatorInfo* pInfo) {
return
pInfo
->
pChildren
!=
NULL
;
}
typedef
SResultWindowInfo
*
(
*
__get_win_info_
)(
void
*
);
SResultWindowInfo
*
getSessionWinInfo
(
void
*
pData
)
{
return
(
SResultWindowInfo
*
)
pData
;
}
SResultWindowInfo
*
getStateWinInfo
(
void
*
pData
)
{
return
&
((
SStateWindowInfo
*
)
pData
)
->
winInfo
;
}
int32_t
closeSessionWindow
(
SArray
*
pWins
,
STimeWindowAggSupp
*
pTwSup
,
SArray
*
pClosed
,
int8_t
calTrigger
)
{
int8_t
calTrigger
,
__get_win_info_
fn
)
{
// Todo(liuyao) save window to tdb
int32_t
size
=
taosArrayGetSize
(
pWins
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SResultWindowInfo
*
pSeWin
=
taosArrayGet
(
pWins
,
i
);
void
*
pWin
=
taosArrayGet
(
pWins
,
i
);
SResultWindowInfo
*
pSeWin
=
fn
(
pWin
);
if
(
pSeWin
->
win
.
ekey
<
pTwSup
->
maxTs
-
pTwSup
->
waterMark
)
{
if
(
!
pSeWin
->
isClosed
)
{
SResKeyPos
*
pos
=
taosMemoryMalloc
(
sizeof
(
SResKeyPos
)
+
sizeof
(
uint64_t
));
...
...
@@ -2543,7 +2572,7 @@ int32_t closeSessionWindow(SArray *pWins, STimeWindowAggSupp *pTwSup, SArray *pC
return
TSDB_CODE_SUCCESS
;
}
static
SSDataBlock
*
doStreamSession
Window
Agg
(
SOperatorInfo
*
pOperator
)
{
static
SSDataBlock
*
doStreamSessionAgg
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
...
...
@@ -2592,9 +2621,9 @@ static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) {
if
(
isFinalSession
(
pInfo
))
{
int32_t
childIndex
=
0
;
//Todo(liuyao) get child id from SSDataBlock
SOptrBasicInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
childIndex
);
doStreamSession
Window
AggImpl
(
pOperator
,
pBlock
,
NULL
,
NULL
);
doStreamSessionAggImpl
(
pOperator
,
pBlock
,
NULL
,
NULL
);
}
doStreamSession
Window
AggImpl
(
pOperator
,
pBlock
,
pStUpdated
,
pInfo
->
pStDeleted
);
doStreamSessionAggImpl
(
pOperator
,
pBlock
,
pStUpdated
,
pInfo
->
pStDeleted
);
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
pBlock
->
info
.
window
.
ekey
);
}
// restore the value
...
...
@@ -2602,7 +2631,7 @@ static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) {
SArray
*
pClosed
=
taosArrayInit
(
16
,
POINTER_BYTES
);
closeSessionWindow
(
pInfo
->
streamAggSup
.
pResultRows
,
&
pInfo
->
twAggSup
,
pClosed
,
pInfo
->
twAggSup
.
calTrigger
);
pInfo
->
twAggSup
.
calTrigger
,
getSessionWinInfo
);
SArray
*
pUpdated
=
taosArrayInit
(
16
,
POINTER_BYTES
);
copyUpdateResult
(
pStUpdated
,
pUpdated
,
pBInfo
->
pRes
->
info
.
groupId
);
taosHashCleanup
(
pStUpdated
);
...
...
@@ -2658,3 +2687,396 @@ _error:
pTaskInfo
->
code
=
code
;
return
NULL
;
}
void
destroyStreamStateOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SStreamStateAggOperatorInfo
*
pInfo
=
(
SStreamStateAggOperatorInfo
*
)
param
;
doDestroyBasicInfo
(
&
pInfo
->
binfo
,
numOfOutput
);
destroyStreamAggSupporter
(
&
pInfo
->
streamAggSup
);
cleanupGroupResInfo
(
&
pInfo
->
groupResInfo
);
if
(
pInfo
->
pChildren
!=
NULL
)
{
int32_t
size
=
taosArrayGetSize
(
pInfo
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SOperatorInfo
*
pChild
=
taosArrayGetP
(
pInfo
->
pChildren
,
i
);
SStreamSessionAggOperatorInfo
*
pChInfo
=
pChild
->
info
;
destroyStreamSessionAggOperatorInfo
(
pChInfo
,
numOfOutput
);
taosMemoryFreeClear
(
pChild
);
taosMemoryFreeClear
(
pChInfo
);
}
}
}
int64_t
getStateWinTsKey
(
void
*
data
,
int32_t
index
)
{
SStateWindowInfo
*
pStateWin
=
taosArrayGet
(
data
,
index
);
return
pStateWin
->
winInfo
.
win
.
ekey
;
}
SStateWindowInfo
*
addNewStateWindow
(
SArray
*
pWinInfos
,
TSKEY
ts
,
char
*
pKeyData
,
SColumn
*
pCol
)
{
SStateWindowInfo
win
=
{.
stateKey
.
bytes
=
pCol
->
bytes
,
.
stateKey
.
type
=
pCol
->
type
,
.
stateKey
.
pData
=
taosMemoryCalloc
(
1
,
pCol
->
bytes
),
.
winInfo
.
pos
.
offset
=
-
1
,
.
winInfo
.
pos
.
pageId
=
-
1
,
.
winInfo
.
win
.
skey
=
ts
,
.
winInfo
.
win
.
ekey
=
ts
,
.
winInfo
.
isOutput
=
false
,
.
winInfo
.
isClosed
=
false
,
};
if
(
IS_VAR_DATA_TYPE
(
win
.
stateKey
.
type
))
{
varDataCopy
(
win
.
stateKey
.
pData
,
pKeyData
);
}
else
{
memcpy
(
win
.
stateKey
.
pData
,
pKeyData
,
win
.
stateKey
.
bytes
);
}
return
taosArrayPush
(
pWinInfos
,
&
win
);
}
SStateWindowInfo
*
insertNewStateWindow
(
SArray
*
pWinInfos
,
TSKEY
ts
,
char
*
pKeyData
,
int32_t
index
,
SColumn
*
pCol
)
{
SStateWindowInfo
win
=
{.
stateKey
.
bytes
=
pCol
->
bytes
,
.
stateKey
.
type
=
pCol
->
type
,
.
stateKey
.
pData
=
taosMemoryCalloc
(
1
,
pCol
->
bytes
),
.
winInfo
.
pos
.
offset
=
-
1
,
.
winInfo
.
pos
.
pageId
=
-
1
,
.
winInfo
.
win
.
skey
=
ts
,
.
winInfo
.
win
.
ekey
=
ts
,
.
winInfo
.
isOutput
=
false
,
.
winInfo
.
isClosed
=
false
,
};
if
(
IS_VAR_DATA_TYPE
(
win
.
stateKey
.
type
))
{
varDataCopy
(
win
.
stateKey
.
pData
,
pKeyData
);
}
else
{
memcpy
(
win
.
stateKey
.
pData
,
pKeyData
,
win
.
stateKey
.
bytes
);
}
return
taosArrayInsert
(
pWinInfos
,
index
,
&
win
);
}
bool
isTsInWindow
(
SStateWindowInfo
*
pWin
,
TSKEY
ts
)
{
if
(
pWin
->
winInfo
.
win
.
skey
<=
ts
&&
ts
<=
pWin
->
winInfo
.
win
.
ekey
)
{
return
true
;
}
return
false
;
}
bool
isEqualStateKey
(
SStateWindowInfo
*
pWin
,
char
*
pKeyData
)
{
return
pKeyData
&&
compareVal
(
pKeyData
,
&
pWin
->
stateKey
);
}
SStateWindowInfo
*
getStateWindowByTs
(
SArray
*
pWinInfos
,
TSKEY
ts
,
int32_t
*
pIndex
)
{
int32_t
size
=
taosArrayGetSize
(
pWinInfos
);
int32_t
index
=
binarySearch
(
pWinInfos
,
size
,
ts
,
TSDB_ORDER_DESC
,
getStateWinTsKey
);
SStateWindowInfo
*
pWin
=
NULL
;
if
(
index
>=
0
)
{
pWin
=
taosArrayGet
(
pWinInfos
,
index
);
if
(
isTsInWindow
(
pWin
,
ts
))
{
*
pIndex
=
index
;
return
pWin
;
}
}
if
(
index
+
1
<
size
)
{
pWin
=
taosArrayGet
(
pWinInfos
,
index
+
1
);
if
(
isTsInWindow
(
pWin
,
ts
))
{
*
pIndex
=
index
+
1
;
return
pWin
;
}
}
*
pIndex
=
0
;
return
NULL
;
}
SStateWindowInfo
*
getStateWindow
(
SArray
*
pWinInfos
,
TSKEY
ts
,
char
*
pKeyData
,
SColumn
*
pCol
,
int32_t
*
pIndex
)
{
int32_t
size
=
taosArrayGetSize
(
pWinInfos
);
if
(
size
==
0
)
{
*
pIndex
=
0
;
return
addNewStateWindow
(
pWinInfos
,
ts
,
pKeyData
,
pCol
);
}
int32_t
index
=
binarySearch
(
pWinInfos
,
size
,
ts
,
TSDB_ORDER_DESC
,
getStateWinTsKey
);
SStateWindowInfo
*
pWin
=
NULL
;
if
(
index
>=
0
)
{
pWin
=
taosArrayGet
(
pWinInfos
,
index
);
if
(
isTsInWindow
(
pWin
,
ts
))
{
*
pIndex
=
index
;
return
pWin
;
}
}
if
(
index
+
1
<
size
)
{
pWin
=
taosArrayGet
(
pWinInfos
,
index
+
1
);
if
(
isTsInWindow
(
pWin
,
ts
)
||
isEqualStateKey
(
pWin
,
pKeyData
))
{
*
pIndex
=
index
+
1
;
return
pWin
;
}
}
if
(
index
>=
0
)
{
pWin
=
taosArrayGet
(
pWinInfos
,
index
);
if
(
isEqualStateKey
(
pWin
,
pKeyData
))
{
*
pIndex
=
index
;
return
pWin
;
}
}
if
(
index
==
size
-
1
)
{
*
pIndex
=
taosArrayGetSize
(
pWinInfos
);
return
addNewStateWindow
(
pWinInfos
,
ts
,
pKeyData
,
pCol
);
}
*
pIndex
=
index
+
1
;
return
insertNewStateWindow
(
pWinInfos
,
ts
,
pKeyData
,
index
+
1
,
pCol
);
}
int32_t
updateStateWindowInfo
(
SArray
*
pWinInfos
,
int32_t
winIndex
,
TSKEY
*
pTs
,
SColumnInfoData
*
pKeyCol
,
int32_t
rows
,
int32_t
start
,
bool
*
allEqual
,
SHashObj
*
pSeDelete
)
{
*
allEqual
=
true
;
SStateWindowInfo
*
pWinInfo
=
taosArrayGet
(
pWinInfos
,
winIndex
);
for
(
int32_t
i
=
start
;
i
<
rows
;
++
i
)
{
char
*
pKeyData
=
colDataGetData
(
pKeyCol
,
i
);
if
(
!
isTsInWindow
(
pWinInfo
,
pTs
[
i
]))
{
if
(
isEqualStateKey
(
pWinInfo
,
pKeyData
))
{
int32_t
size
=
taosArrayGetSize
(
pWinInfos
);
if
(
winIndex
+
1
<
size
)
{
SStateWindowInfo
*
pNextWin
=
taosArrayGet
(
pWinInfos
,
winIndex
+
1
);
// ts belongs to the next window
if
(
pTs
[
i
]
>=
pNextWin
->
winInfo
.
win
.
skey
)
{
return
i
-
start
;
}
}
}
else
{
return
i
-
start
;
}
}
if
(
pWinInfo
->
winInfo
.
win
.
skey
>
pTs
[
i
])
{
if
(
pSeDelete
&&
pWinInfo
->
winInfo
.
isOutput
)
{
taosHashPut
(
pSeDelete
,
&
pWinInfo
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
),
&
pWinInfo
->
winInfo
.
win
.
skey
,
sizeof
(
TSKEY
));
pWinInfo
->
winInfo
.
isOutput
=
false
;
}
pWinInfo
->
winInfo
.
win
.
skey
=
pTs
[
i
];
}
pWinInfo
->
winInfo
.
win
.
ekey
=
TMAX
(
pWinInfo
->
winInfo
.
win
.
ekey
,
pTs
[
i
]);
if
(
!
isEqualStateKey
(
pWinInfo
,
pKeyData
))
{
*
allEqual
=
false
;
}
}
return
rows
-
start
;
}
void
deleteWindow
(
SArray
*
pWinInfos
,
int32_t
index
)
{
ASSERT
(
index
>=
0
&&
index
<
taosArrayGetSize
(
pWinInfos
));
taosArrayRemove
(
pWinInfos
,
index
);
}
static
void
doClearStateWindows
(
SStreamAggSupporter
*
pAggSup
,
SSDataBlock
*
pBlock
,
int32_t
tsIndex
,
SColumn
*
pCol
,
int32_t
keyIndex
,
SHashObj
*
pSeUpdated
,
SHashObj
*
pSeDeleted
)
{
SColumnInfoData
*
pTsColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
tsIndex
);
SColumnInfoData
*
pKeyColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
keyIndex
);
TSKEY
*
tsCol
=
(
TSKEY
*
)
pTsColInfo
->
pData
;
bool
allEqual
=
false
;
int32_t
step
=
1
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
+=
step
)
{
char
*
pKeyData
=
colDataGetData
(
pKeyColInfo
,
i
);
int32_t
winIndex
=
0
;
SStateWindowInfo
*
pCurWin
=
getStateWindowByTs
(
pAggSup
->
pResultRows
,
tsCol
[
i
],
&
winIndex
);
if
(
!
pCurWin
)
{
continue
;
}
step
=
updateStateWindowInfo
(
pAggSup
->
pResultRows
,
winIndex
,
tsCol
,
pKeyColInfo
,
pBlock
->
info
.
rows
,
i
,
&
allEqual
,
pSeDeleted
);
ASSERT
(
isTsInWindow
(
pCurWin
,
tsCol
[
i
])
||
isEqualStateKey
(
pCurWin
,
pKeyData
));
taosArrayPush
(
pAggSup
->
pScanWindow
,
&
pCurWin
->
winInfo
.
win
);
taosHashRemove
(
pSeUpdated
,
&
pCurWin
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
));
deleteWindow
(
pAggSup
->
pResultRows
,
winIndex
);
}
}
static
void
doStreamStateAggImpl
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pSDataBlock
,
SHashObj
*
pSeUpdated
,
SHashObj
*
pStDeleted
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SStreamStateAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
bool
masterScan
=
true
;
int32_t
numOfOutput
=
pOperator
->
numOfExprs
;
int64_t
groupId
=
pSDataBlock
->
info
.
groupId
;
int64_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
step
=
1
;
bool
ascScan
=
true
;
TSKEY
*
tsCols
=
NULL
;
SResultRow
*
pResult
=
NULL
;
int32_t
winRows
=
0
;
if
(
pSDataBlock
->
pDataBlock
!=
NULL
)
{
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
tsCols
=
(
int64_t
*
)
pColDataInfo
->
pData
;
}
else
{
return
;
}
SStreamAggSupporter
*
pAggSup
=
&
pInfo
->
streamAggSup
;
SColumnInfoData
*
pKeyColInfo
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
pInfo
->
stateCol
.
slotId
);
for
(
int32_t
i
=
0
;
i
<
pSDataBlock
->
info
.
rows
;
i
+=
winRows
)
{
char
*
pKeyData
=
colDataGetData
(
pKeyColInfo
,
i
);
int32_t
winIndex
=
0
;
bool
allEqual
=
true
;
SStateWindowInfo
*
pCurWin
=
getStateWindow
(
pAggSup
->
pResultRows
,
tsCols
[
i
],
pKeyData
,
&
pInfo
->
stateCol
,
&
winIndex
);
winRows
=
updateStateWindowInfo
(
pAggSup
->
pResultRows
,
winIndex
,
tsCols
,
pKeyColInfo
,
pSDataBlock
->
info
.
rows
,
i
,
&
allEqual
,
pInfo
->
pSeDeleted
);
if
(
!
allEqual
)
{
taosArrayPush
(
pAggSup
->
pScanWindow
,
&
pCurWin
->
winInfo
.
win
);
taosHashRemove
(
pSeUpdated
,
&
pCurWin
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
));
deleteWindow
(
pAggSup
->
pResultRows
,
winIndex
);
continue
;
}
code
=
doOneStateWindowAgg
(
pInfo
,
pSDataBlock
,
&
pCurWin
->
winInfo
,
&
pResult
,
i
,
winRows
,
numOfOutput
,
pTaskInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
pCurWin
->
winInfo
.
isClosed
=
false
;
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
)
{
code
=
taosHashPut
(
pSeUpdated
,
&
pCurWin
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
),
&
(
pCurWin
->
winInfo
.
win
.
skey
),
sizeof
(
TSKEY
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
pCurWin
->
winInfo
.
isOutput
=
true
;
}
}
}
static
SSDataBlock
*
doStreamStateAgg
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SStreamStateAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildDeleteDataBlock
(
pInfo
->
pSeDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
return
pInfo
->
pDelRes
;
}
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
streamAggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
hashRemainDataInGroupInfo
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
}
return
pBInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pBInfo
->
pRes
;
}
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
SHashObj
*
pSeUpdated
=
taosHashInit
(
64
,
hashFn
,
true
,
HASH_NO_LOCK
);
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
break
;
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pBInfo
->
pCtx
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
if
(
pBlock
->
info
.
type
==
STREAM_REPROCESS
)
{
doClearStateWindows
(
&
pInfo
->
streamAggSup
,
pBlock
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
stateCol
,
pInfo
->
stateCol
.
slotId
,
pSeUpdated
,
pInfo
->
pSeDeleted
);
continue
;
}
doStreamStateAggImpl
(
pOperator
,
pBlock
,
pSeUpdated
,
pInfo
->
pSeDeleted
);
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
pBlock
->
info
.
window
.
ekey
);
}
// restore the value
pOperator
->
status
=
OP_RES_TO_RETURN
;
SArray
*
pClosed
=
taosArrayInit
(
16
,
POINTER_BYTES
);
closeSessionWindow
(
pInfo
->
streamAggSup
.
pResultRows
,
&
pInfo
->
twAggSup
,
pClosed
,
pInfo
->
twAggSup
.
calTrigger
,
getStateWinInfo
);
SArray
*
pUpdated
=
taosArrayInit
(
16
,
POINTER_BYTES
);
copyUpdateResult
(
pSeUpdated
,
pUpdated
,
pBInfo
->
pRes
->
info
.
groupId
);
taosHashCleanup
(
pSeUpdated
);
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
)
{
taosArrayAddAll
(
pUpdated
,
pClosed
);
}
finalizeUpdatedResult
(
pOperator
->
numOfExprs
,
pInfo
->
streamAggSup
.
pResultBuf
,
pUpdated
,
pInfo
->
binfo
.
rowCellInfoOffset
);
initMultiResInfoFromArrayList
(
&
pInfo
->
groupResInfo
,
pUpdated
);
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
doBuildDeleteDataBlock
(
pInfo
->
pSeDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
return
pInfo
->
pDelRes
;
}
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
streamAggSup
.
pResultBuf
);
return
pBInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pBInfo
->
pRes
;
}
SOperatorInfo
*
createStreamStateAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SStreamStateWinodwPhysiNode
*
pStateNode
=
(
SStreamStateWinodwPhysiNode
*
)
pPhyNode
;
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
int32_t
tsSlotId
=
((
SColumnNode
*
)
pStateNode
->
window
.
pTspk
)
->
slotId
;
SColumnNode
*
pColNode
=
(
SColumnNode
*
)((
STargetNode
*
)
pStateNode
->
pStateKey
)
->
pExpr
;
SStreamStateAggOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateAggOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pStateNode
->
window
.
pFuncs
,
NULL
,
&
numOfCols
);
pInfo
->
stateCol
=
extractColumnFromColumnNode
(
pColNode
);
initResultSizeInfo
(
pOperator
,
4096
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
);
pInfo
->
twAggSup
=
(
STimeWindowAggSupp
)
{.
waterMark
=
pStateNode
->
window
.
watermark
,
.
calTrigger
=
pStateNode
->
window
.
triggerType
,
.
maxTs
=
INT64_MIN
,
.
winMap
=
NULL
,};
initExecTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pTaskInfo
->
window
);
int32_t
code
=
initStateAggSupporter
(
&
pInfo
->
streamAggSup
,
"StreamStateAggOperatorInfo"
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
code
=
initBiasicInfo
(
&
pInfo
->
binfo
,
pExprInfo
,
numOfCols
,
pResBlock
,
pInfo
->
streamAggSup
.
pResultBuf
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
pInfo
->
streamAggSup
.
resultRowSize
=
getResultRowSize
(
pInfo
->
binfo
.
pCtx
,
numOfCols
);
pInfo
->
pDummyCtx
=
(
SqlFunctionCtx
*
)
taosMemoryCalloc
(
numOfCols
,
sizeof
(
SqlFunctionCtx
));
if
(
pInfo
->
pDummyCtx
==
NULL
)
{
goto
_error
;
}
initDummyFunction
(
pInfo
->
pDummyCtx
,
pInfo
->
binfo
.
pCtx
,
numOfCols
);
pInfo
->
primaryTsIndex
=
tsSlotId
;
pInfo
->
order
=
TSDB_ORDER_ASC
;
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
pInfo
->
pSeDeleted
=
taosHashInit
(
64
,
hashFn
,
true
,
HASH_NO_LOCK
);
pInfo
->
pDelIterator
=
NULL
;
pInfo
->
pDelRes
=
createOneDataBlock
(
pResBlock
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pDelRes
,
64
);
pInfo
->
pChildren
=
NULL
;
pOperator
->
name
=
"StreamStateAggOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
numOfExprs
=
numOfCols
;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
info
=
pInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamStateAgg
,
NULL
,
NULL
,
destroyStreamStateOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
initDownStream
(
downstream
,
&
pInfo
->
streamAggSup
,
0
,
pInfo
->
twAggSup
.
waterMark
,
pOperator
->
operatorType
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
return
pOperator
;
_error:
destroyStreamStateOperatorInfo
(
pInfo
,
numOfCols
);
taosMemoryFreeClear
(
pInfo
);
taosMemoryFreeClear
(
pOperator
);
pTaskInfo
->
code
=
code
;
return
NULL
;
}
source/libs/function/src/builtinsimpl.c
浏览文件 @
497c27d6
...
...
@@ -610,8 +610,7 @@ int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
}
else
if
(
type
==
TSDB_DATA_TYPE_DOUBLE
||
type
==
TSDB_DATA_TYPE_FLOAT
)
{
pDBuf
->
dsum
+=
pSBuf
->
dsum
;
}
SET_VAL
(
pDResInfo
,
*
((
int64_t
*
)
pDBuf
),
1
);
pDResInfo
->
numOfRes
=
TMAX
(
pDResInfo
->
numOfRes
,
pSResInfo
->
numOfRes
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1394,7 +1393,7 @@ int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
pDBuf
->
v
=
pSBuf
->
v
;
}
}
SET_VAL
(
pDResInfo
,
*
((
int64_t
*
)
pDBuf
),
1
);
pDResInfo
->
numOfRes
=
TMAX
(
pDResInfo
->
numOfRes
,
pSResInfo
->
numOfRes
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1639,6 +1638,7 @@ int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
pDBuf
->
quadraticDSum
+=
pSBuf
->
quadraticDSum
;
}
pDBuf
->
count
+=
pSBuf
->
count
;
pDResInfo
->
numOfRes
=
TMAX
(
pDResInfo
->
numOfRes
,
pSResInfo
->
numOfRes
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
497c27d6
...
...
@@ -244,6 +244,8 @@ const char* nodesNodeName(ENodeType type) {
return
"PhysiStreamSessionWindow"
;
case
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
:
return
"PhysiStateWindow"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW
:
return
"PhysiStreamStateWindow"
;
case
QUERY_NODE_PHYSICAL_PLAN_PARTITION
:
return
"PhysiPartition"
;
case
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
:
...
...
@@ -2744,6 +2746,28 @@ static int32_t jsonToOrderByExprNode(const SJson* pJson, void* pObj) {
return
code
;
}
static
const
char
*
jkStateWindowCol
=
"StateWindowCol"
;
static
const
char
*
jkStateWindowExpr
=
"StateWindowExpr"
;
static
int32_t
stateWindowNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SStateWindowNode
*
pNode
=
(
const
SStateWindowNode
*
)
pObj
;
int32_t
code
=
tjsonAddObject
(
pJson
,
jkStateWindowCol
,
nodeToJson
,
pNode
->
pCol
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkStateWindowExpr
,
nodeToJson
,
pNode
->
pExpr
);
}
return
code
;
}
static
int32_t
jsonToStateWindowNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SStateWindowNode
*
pNode
=
(
SStateWindowNode
*
)
pObj
;
int32_t
code
=
jsonToNodeObject
(
pJson
,
jkStateWindowCol
,
(
SNode
**
)
&
pNode
->
pCol
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkStateWindowExpr
,
(
SNode
**
)
&
pNode
->
pExpr
);
}
return
code
;
}
static
const
char
*
jkSessionWindowTsPrimaryKey
=
"TsPrimaryKey"
;
static
const
char
*
jkSessionWindowGap
=
"Gap"
;
...
...
@@ -3521,8 +3545,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case
QUERY_NODE_ORDER_BY_EXPR
:
return
orderByExprNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_LIMIT
:
case
QUERY_NODE_STATE_WINDOW
:
break
;
case
QUERY_NODE_STATE_WINDOW
:
return
stateWindowNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_SESSION_WINDOW
:
return
sessionWindowNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_INTERVAL_WINDOW
:
...
...
@@ -3626,6 +3651,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW
:
return
physiSessionWindowNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW
:
return
physiStateWindowNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_PARTITION
:
return
physiPartitionNodeToJson
(
pObj
,
pJson
);
...
...
@@ -3662,6 +3688,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return
jsonToTempTableNode
(
pJson
,
pObj
);
case
QUERY_NODE_ORDER_BY_EXPR
:
return
jsonToOrderByExprNode
(
pJson
,
pObj
);
case
QUERY_NODE_STATE_WINDOW
:
return
jsonToStateWindowNode
(
pJson
,
pObj
);
case
QUERY_NODE_SESSION_WINDOW
:
return
jsonToSessionWindowNode
(
pJson
,
pObj
);
case
QUERY_NODE_INTERVAL_WINDOW
:
...
...
@@ -3745,6 +3773,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW
:
return
jsonToPhysiSessionWindowNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW
:
return
jsonToPhysiStateWindowNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_PARTITION
:
return
jsonToPhysiPartitionNode
(
pJson
,
pObj
);
...
...
source/libs/nodes/src/nodesTraverseFuncs.c
浏览文件 @
497c27d6
...
...
@@ -520,7 +520,8 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW
:
res
=
walkWindowPhysi
((
SWinodwPhysiNode
*
)
pNode
,
order
,
walker
,
pContext
);
break
;
case
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
:
{
case
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW
:
{
SStateWinodwPhysiNode
*
pState
=
(
SStateWinodwPhysiNode
*
)
pNode
;
res
=
walkWindowPhysi
((
SWinodwPhysiNode
*
)
pNode
,
order
,
walker
,
pContext
);
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
497c27d6
...
...
@@ -274,6 +274,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return
makeNode
(
type
,
sizeof
(
SStreamSessionWinodwPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
:
return
makeNode
(
type
,
sizeof
(
SStateWinodwPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW
:
return
makeNode
(
type
,
sizeof
(
SStreamStateWinodwPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_PARTITION
:
return
makeNode
(
type
,
sizeof
(
SPartitionPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
:
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
497c27d6
...
...
@@ -108,7 +108,8 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
return
false
;
}
if
(
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pNode
->
pParent
))
{
return
(
WINDOW_TYPE_INTERVAL
==
((
SWindowLogicNode
*
)
pNode
->
pParent
)
->
winType
);
return
true
;
// return (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType);
}
return
!
osdHaveNormalCol
(((
SAggLogicNode
*
)
pNode
->
pParent
)
->
pGroupKeys
);
}
...
...
@@ -217,8 +218,7 @@ static int32_t osdGetDataRequired(SNodeList* pFuncs) {
}
static
void
setScanWindowInfo
(
SScanLogicNode
*
pScan
)
{
if
(
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pScan
->
node
.
pParent
)
&&
WINDOW_TYPE_INTERVAL
==
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
winType
)
{
if
(
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pScan
->
node
.
pParent
))
{
pScan
->
interval
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
interval
;
pScan
->
offset
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
offset
;
pScan
->
sliding
=
((
SWindowLogicNode
*
)
pScan
->
node
.
pParent
)
->
sliding
;
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
497c27d6
...
...
@@ -980,7 +980,9 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList*
static
int32_t
createStateWindowPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SWindowLogicNode
*
pWindowLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SStateWinodwPhysiNode
*
pState
=
(
SStateWinodwPhysiNode
*
)
makePhysiNode
(
pCxt
,
getPrecision
(
pChildren
),
(
SLogicNode
*
)
pWindowLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
);
pCxt
,
getPrecision
(
pChildren
),
(
SLogicNode
*
)
pWindowLogicNode
,
(
pCxt
->
pPlanCxt
->
streamQuery
?
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW
:
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
));
if
(
NULL
==
pState
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
tests/script/tsim/stream/session0.sim
浏览文件 @
497c27d6
...
...
@@ -70,42 +70,42 @@ endi
# row 1
if $data11 != 3 then
print ======$data
0
1
print ======$data
1
1
return -1
endi
if $data12 != 10 then
print ======$data
0
2
print ======$data
1
2
return -1
endi
if $data13 != 10 then
print ======$data
0
3
print ======$data
1
3
return -1
endi
if $data14 != 1.100000000 then
print ======$data
0
4
print ======$data
1
4
return -1
endi
if $data15 != 0.000000000 then
print ======$data
0
5
print ======$data
1
5
return -1
endi
if $data16 != 10 then
print ======$data
0
5
print ======$data
1
5
return -1
endi
if $data17 != 1.100000000 then
print ======$data
05
print ======$data
17
return -1
endi
if $data18 != 5 then
print ======$data
05
print ======$data
18
return -1
endi
...
...
@@ -145,17 +145,17 @@ if $data05 != 0.816496581 then
endi
if $data06 != 3 then
print ======$data0
5
print ======$data0
6
return -1
endi
if $data07 != 1.100000000 then
print ======$data0
5
print ======$data0
7
return -1
endi
if $data08 != 13 then
print ======$data0
5
print ======$data0
8
return -1
endi
...
...
tests/script/tsim/stream/session1.sim
浏览文件 @
497c27d6
...
...
@@ -187,4 +187,14 @@ if $data34 != 19 then
return -1
endi
sql insert into t1 values(1648791000000,1,1,1,1.1,23);
sleep 300
sql select * from streamt order by s desc;
# row 0
if $data01 != 1 then
print ======$data01
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/script/tsim/stream/state0.sim
0 → 100644
浏览文件 @
497c27d6
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database test vgroups 1
sql show databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test
sql create table t1(ts timestamp, a int, b int , c int, d double, id int);
sql create stream streams1 trigger at_once into streamt1 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
sql insert into t1 values(1648791213000,1,2,3,1.0,2);
sql select * from streamt1 order by c desc;
sleep 300
if $rows != 1 then
print ======$rows
return -1;
endi
sql insert into t1 values(1648791214000,1,2,3,1.0,3);
sql select * from streamt1 order by c desc;
sleep 300
if $rows != 1 then
print ======$rows
return -1;
endi
sql insert into t1 values(1648791213010,2,2,3,1.0,4);
sql insert into t1 values(1648791213000,1,2,3,1.0,5);
sql insert into t1 values(1648791214000,1,2,3,1.0,6);
$loop_count = 0
loop1:
sql select * from streamt1 where c >=4 order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 3 then
print ======$rows
goto loop1
return -1
endi
# row 0
if $data01 != 1 then
print ======$data01
return -1
endi
if $data02 != 1 then
print ======$data02
return -1
endi
if $data03 != 1 then
print ======$data03
return -1
endi
if $data04 != 1 then
print ======$data04
return -1
endi
if $data05 != 3 then
print ======$data05
return -1
endi
if $data06 != 5 then
print ======$data06
return -1
endi
# row 1
if $data11 != 1 then
print ======$data11
return -1
endi
if $data12 != 1 then
print ======$data12
return -1
endi
if $data13 != 2 then
print ======$data13
return -1
endi
if $data14 != 2 then
print ======$data14
return -1
endi
if $data15 != 3 then
print ======$data15
return -1
endi
if $data16 != 4 then
print ======$data16
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
return -1
endi
if $data22 != 1 then
print ======$data22
return -1
endi
if $data23 != 1 then
print ======$data23
return -1
endi
if $data24 != 1 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
if $data26 != 6 then
print ======$data26
return -1
endi
sql insert into t1 values(1648791213011,1,2,3,1.0,7);
loop2:
$loop_count = 0
sql select * from streamt1 where c in (5,4,7) order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 2
if $data21 != 2 then
print ======$data21
goto loop2
return -1
endi
if $data22 != 2 then
print ======$data22
goto loop2
return -1
endi
if $data23 != 2 then
print ======$data23
goto loop2
return -1
endi
if $data24 != 1 then
print ======$data24
goto loop2
return -1
endi
if $data25 != 3 then
print ======$data25
goto loop2
return -1
endi
if $data26 != 7 then
print ======$data26
goto loop2
return -1
endi
sql insert into t1 values(1648791213011,1,2,3,1.0,8);
loop21:
$loop_count = 0
sql select * from streamt1 where c in (5,4,8) order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data26 != 8 then
print ======$data26
goto loop21
return -1
endi
sql insert into t1 values(1648791213020,1,2,3,1.0,9);
sql insert into t1 values(1648791213020,3,2,3,1.0,10);
sql insert into t1 values(1648791214000,1,2,3,1.0,11);
sql insert into t1 values(1648791213011,10,20,10,10.0,12);
loop3:
$loop_count = 0
sql select * from streamt1 where c in (5,4,10,11,12) order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
goto loop3
return -1
endi
if $data22 != 1 then
print ======$data22
goto loop3
return -1
endi
if $data23 != 10 then
print ======$data23
goto loop3
return -1
endi
if $data24 != 10 then
print ======$data24
goto loop3
return -1
endi
if $data25 != 10 then
print ======$data25
goto loop3
return -1
endi
if $data26 != 12 then
print ======$data26
goto loop3
return -1
endi
# row 3
if $data31 != 1 then
print ======$data31
goto loop3
return -1
endi
if $data32 != 1 then
print ======$data32
goto loop3
return -1
endi
if $data33 != 3 then
print ======$data33
goto loop3
return -1
endi
if $data34 != 3 then
print ======$data34
goto loop3
return -1
endi
if $data35 != 3 then
print ======$data35
goto loop3
return -1
endi
if $data36 != 10 then
print ======$data36
goto loop3
return -1
endi
# row 4
if $data41 != 1 then
print ======$data41
goto loop3
return -1
endi
if $data42 != 1 then
print ======$data42
goto loop3
return -1
endi
if $data43 != 1 then
print ======$data43
goto loop3
return -1
endi
if $data44 != 1 then
print ======$data44
goto loop3
return -1
endi
if $data45 != 3 then
print ======$data45
goto loop3
return -1
endi
if $data46 != 11 then
print ======$data46
goto loop3
return -1
endi
sql insert into t1 values(1648791213030,3,12,12,12.0,13);
sql insert into t1 values(1648791214040,1,13,13,13.0,14);
sql insert into t1 values(1648791213030,3,14,14,14.0,15) (1648791214020,15,15,15,15.0,16);
loop4:
$loop_count = 0
sql select * from streamt1 where c in (14,15,16) order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 3 then
print ======$rows
goto loop4
return -1;
endi
# row 0
if $data01 != 2 then
print ======$data01
goto loop4
return -1
endi
if $data02 != 2 then
print ======$data02
goto loop4
return -1
endi
if $data03 != 6 then
print ======$data03
goto loop4
return -1
endi
if $data04 != 3 then
print ======$data04
goto loop4
return -1
endi
if $data05 != 3 then
print ======$data05
goto loop4
return -1
endi
if $data06 != 15 then
print ======$data06
goto loop4
return -1
endi
# row 1
if $data11 != 1 then
print ======$data11
goto loop4
return -1
endi
if $data12 != 1 then
print ======$data12
goto loop4
return -1
endi
if $data13 != 15 then
print ======$data13
goto loop4
return -1
endi
if $data14 != 15 then
print ======$data14
goto loop4
return -1
endi
if $data15 != 15 then
print ======$data15
goto loop4
return -1
endi
if $data16 != 16 then
print ======$data16
goto loop4
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
goto loop4
return -1
endi
if $data22 != 1 then
print ======$data22
goto loop4
return -1
endi
if $data23 != 1 then
print ======$data23
goto loop4
return -1
endi
if $data24 != 1 then
print ======$data24
goto loop4
return -1
endi
if $data25 != 13 then
print ======$data25
goto loop4
return -1
endi
if $data26 != 14 then
print ======$data26
goto loop4
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/script/tsim/stream/triggerSession0.sim
浏览文件 @
497c27d6
...
...
@@ -102,4 +102,4 @@ if $data21 != 1 then
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录