Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
6e49b7bb
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
6e49b7bb
编写于
6月 15, 2022
作者:
L
liuyao
提交者:
GitHub
6月 15, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13862 from taosdata/feature/TD-16512
feat(stream): stream state&session support partition by
上级
fd9bb8db
87bcbe00
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
108 addition
and
87 deletion
+108
-87
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+5
-3
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+5
-2
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+97
-81
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
6e49b7bb
...
...
@@ -365,7 +365,9 @@ typedef struct SCatchSupporter {
}
SCatchSupporter
;
typedef
struct
SStreamAggSupporter
{
SArray
*
pResultRows
;
SHashObj
*
pResultRows
;
SArray
*
pCurWins
;
int32_t
valueSize
;
int32_t
keySize
;
char
*
pKeyBuf
;
// window key buffer
SDiskbasedBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
...
...
@@ -899,9 +901,9 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary
__block_search_fn_t
searchFn
,
STableQueryInfo
*
item
,
int32_t
order
);
int32_t
binarySearchForKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
);
int32_t
initStreamAggSupporter
(
SStreamAggSupporter
*
pSup
,
const
char
*
pKey
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
size
_t
size
);
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32
_t
size
);
SResultRow
*
getNewResultRow
(
SDiskbasedBuf
*
pResultBuf
,
int64_t
tableGroupId
,
int32_t
interBufSize
);
SResultWindowInfo
*
getSessionTimeWindow
(
S
Array
*
pWinInfos
,
TSKEY
ts
,
int64_t
gap
,
int32_t
*
pIndex
);
SResultWindowInfo
*
getSessionTimeWindow
(
S
StreamAggSupporter
*
pAggSup
,
TSKEY
ts
,
uint64_t
groupId
,
int64_t
gap
,
int32_t
*
pIndex
);
int32_t
updateSessionWindowInfo
(
SResultWindowInfo
*
pWinInfo
,
TSKEY
*
pTs
,
int32_t
rows
,
int32_t
start
,
int64_t
gap
,
SHashObj
*
pStDeleted
);
bool
functionNeedToExecute
(
SqlFunctionCtx
*
pCtx
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
6e49b7bb
...
...
@@ -4814,6 +4814,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.
calTrigger
=
pIntervalPhyNode
->
window
.
triggerType
,
.
maxTs
=
INT64_MIN
,
};
ASSERT
(
as
.
calTrigger
!=
STREAM_TRIGGER_MAX_DELAY
);
int32_t
tsSlotId
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
bool
isStream
=
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
==
type
);
...
...
@@ -5498,14 +5499,16 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
}
int32_t
initStreamAggSupporter
(
SStreamAggSupporter
*
pSup
,
const
char
*
pKey
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
size
_t
size
)
{
int32
_t
size
)
{
pSup
->
resultRowSize
=
getResultRowSize
(
pCtx
,
numOfOutput
);
pSup
->
keySize
=
sizeof
(
int64_t
)
+
sizeof
(
TSKEY
);
pSup
->
pKeyBuf
=
taosMemoryCalloc
(
1
,
pSup
->
keySize
);
pSup
->
pResultRows
=
taosArrayInit
(
1024
,
size
);
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
pSup
->
pResultRows
=
taosHashInit
(
1024
,
hashFn
,
false
,
HASH_NO_LOCK
);
if
(
pSup
->
pKeyBuf
==
NULL
||
pSup
->
pResultRows
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pSup
->
valueSize
=
size
;
pSup
->
pScanWindow
=
taosArrayInit
(
4
,
sizeof
(
STimeWindow
));
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
6e49b7bb
...
...
@@ -716,7 +716,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
int64_t
gap
=
pInfo
->
sessionSup
.
gap
;
int32_t
winIndex
=
0
;
SResultWindowInfo
*
pCurWin
=
getSessionTimeWindow
(
pAggSup
->
pResultRows
,
tsCols
[
pInfo
->
updateResIndex
]
,
gap
,
&
winIndex
);
getSessionTimeWindow
(
pAggSup
,
tsCols
[
pInfo
->
updateResIndex
],
pSDB
->
info
.
groupId
,
gap
,
&
winIndex
);
win
=
pCurWin
->
win
;
pInfo
->
updateResIndex
+=
updateSessionWindowInfo
(
pCurWin
,
tsCols
,
pSDB
->
info
.
rows
,
pInfo
->
updateResIndex
,
gap
,
NULL
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
6e49b7bb
...
...
@@ -1320,24 +1320,23 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
break
;
}
// The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the
// caller. Note that all the time window are not close till now.
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
pInfo
->
order
,
MAIN_SCAN
,
true
);
if
(
pInfo
->
invertible
)
{
setInverFunction
(
pInfo
->
binfo
.
pCtx
,
pOperator
->
numOfExprs
,
pBlock
->
info
.
type
);
}
if
(
pBlock
->
info
.
type
==
STREAM_REPROCESS
)
{
doClearWindows
(
&
pInfo
->
aggSup
,
&
pInfo
->
binfo
,
&
pInfo
->
interval
,
0
,
pOperator
->
numOfExprs
,
pBlock
,
NULL
);
qDebug
(
"%s clear existed time window results for updates checked"
,
GET_TASKID
(
pTaskInfo
));
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
&&
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_MAX_DELAY
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
)
{
getAllIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
pUpdated
);
continue
;
}
// The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the
// caller. Note that all the time window are not close till now.
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
pInfo
->
order
,
MAIN_SCAN
,
true
);
if
(
pInfo
->
invertible
)
{
setInverFunction
(
pInfo
->
binfo
.
pCtx
,
pOperator
->
numOfExprs
,
pBlock
->
info
.
type
);
}
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
pBlock
->
info
.
window
.
ekey
);
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
MAIN_SCAN
,
pUpdated
);
}
...
...
@@ -2038,7 +2037,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
break
;
}
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
pInfo
->
order
,
MAIN_SCAN
,
true
);
if
(
pBlock
->
info
.
type
==
STREAM_REPROCESS
)
{
SArray
*
pUpWins
=
taosArrayInit
(
8
,
sizeof
(
STimeWindow
));
doClearWindows
(
&
pInfo
->
aggSup
,
&
pInfo
->
binfo
,
&
pInfo
->
interval
,
pInfo
->
primaryTsIndex
,
pOperator
->
numOfExprs
,
...
...
@@ -2058,12 +2056,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
copyUpdateDataBlock
(
pInfo
->
pUpdateRes
,
pBlock
,
pInfo
->
primaryTsIndex
);
taosArrayDestroy
(
pUpWins
);
break
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
&&
isFinalInterval
(
pInfo
)
&&
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_MAX_DELAY
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
&&
isFinalInterval
(
pInfo
))
{
getAllIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
pUpdated
);
continue
;
}
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
pInfo
->
order
,
MAIN_SCAN
,
true
);
if
(
isFinalInterval
(
pInfo
))
{
int32_t
chIndex
=
getChildIndex
(
pBlock
);
int32_t
size
=
taosArrayGetSize
(
pInfo
->
pChildren
);
...
...
@@ -2125,6 +2123,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
.
calTrigger
=
pIntervalPhyNode
->
window
.
triggerType
,
.
maxTs
=
INT64_MIN
,
};
ASSERT
(
pInfo
->
twAggSup
.
calTrigger
!=
STREAM_TRIGGER_MAX_DELAY
);
pInfo
->
primaryTsIndex
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
initResultSizeInfo
(
pOperator
,
4096
);
...
...
@@ -2190,8 +2189,13 @@ _error:
}
void
destroyStreamAggSupporter
(
SStreamAggSupporter
*
pSup
)
{
taosArrayDestroy
(
pSup
->
pResultRows
);
taosMemoryFreeClear
(
pSup
->
pKeyBuf
);
void
**
pIte
=
NULL
;
while
((
pIte
=
taosHashIterate
(
pSup
->
pResultRows
,
pIte
))
!=
NULL
)
{
SArray
*
pWins
=
(
SArray
*
)
(
*
pIte
);
taosArrayDestroy
(
pWins
);
}
taosHashCleanup
(
pSup
->
pResultRows
);
destroyDiskbasedBuf
(
pSup
->
pResultBuf
);
}
...
...
@@ -2333,7 +2337,22 @@ static SResultWindowInfo* addNewSessionWindow(SArray* pWinInfos, TSKEY ts) {
return
taosArrayPush
(
pWinInfos
,
&
win
);
}
SResultWindowInfo
*
getSessionTimeWindow
(
SArray
*
pWinInfos
,
TSKEY
ts
,
int64_t
gap
,
int32_t
*
pIndex
)
{
SArray
*
getWinInfos
(
SStreamAggSupporter
*
pAggSup
,
uint64_t
groupId
)
{
void
**
ite
=
taosHashGet
(
pAggSup
->
pResultRows
,
&
groupId
,
sizeof
(
uint64_t
));
SArray
*
pWinInfos
=
NULL
;
if
(
ite
==
NULL
)
{
pWinInfos
=
taosArrayInit
(
1024
,
pAggSup
->
valueSize
);
taosHashPut
(
pAggSup
->
pResultRows
,
&
groupId
,
sizeof
(
uint64_t
),
&
pWinInfos
,
sizeof
(
void
*
));
}
else
{
pWinInfos
=
*
ite
;
}
return
pWinInfos
;
}
SResultWindowInfo
*
getSessionTimeWindow
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
ts
,
uint64_t
groupId
,
int64_t
gap
,
int32_t
*
pIndex
)
{
SArray
*
pWinInfos
=
getWinInfos
(
pAggSup
,
groupId
);
pAggSup
->
pCurWins
=
pWinInfos
;
int32_t
size
=
taosArrayGetSize
(
pWinInfos
);
if
(
size
==
0
)
{
*
pIndex
=
0
;
...
...
@@ -2389,7 +2408,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes
SStreamAggSupporter
*
pAggSup
,
SExecTaskInfo
*
pTaskInfo
)
{
assert
(
pWinInfo
->
win
.
skey
<=
pWinInfo
->
win
.
ekey
);
// too many time window in query
int32_t
size
=
taosArrayGetSize
(
pAggSup
->
p
ResultRow
s
);
int32_t
size
=
taosArrayGetSize
(
pAggSup
->
p
CurWin
s
);
if
(
size
>
MAX_INTERVAL_TIME_WINDOW
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW
);
}
...
...
@@ -2449,25 +2468,6 @@ static int32_t doOneStateWindowAgg(SStreamStateAggOperatorInfo* pInfo, SSDataBlo
pSDataBlock
,
pCurWin
,
pResult
,
startIndex
,
winRows
,
numOutput
,
pTaskInfo
);
}
int32_t
copyWinInfoToDataBlock
(
SSDataBlock
*
pBlock
,
SStreamAggSupporter
*
pAggSup
,
int32_t
start
,
int32_t
num
,
int32_t
numOfExprs
,
SOptrBasicInfo
*
pBinfo
)
{
for
(
int32_t
i
=
start
;
i
<
num
;
i
+=
1
)
{
SResultWindowInfo
*
pWinInfo
=
taosArrayGet
(
pAggSup
->
pResultRows
,
start
);
SFilePage
*
bufPage
=
getBufPage
(
pAggSup
->
pResultBuf
,
pWinInfo
->
pos
.
pageId
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
bufPage
+
pWinInfo
->
pos
.
offset
);
for
(
int32_t
j
=
0
;
j
<
numOfExprs
;
++
j
)
{
SResultRowEntryInfo
*
pResultInfo
=
getResultCell
(
pRow
,
j
,
pBinfo
->
rowCellInfoOffset
);
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
j
);
char
*
in
=
GET_ROWCELL_INTERBUF
(
pBinfo
->
pCtx
[
j
].
resultInfo
);
colDataAppend
(
pColInfoData
,
pBlock
->
info
.
rows
,
in
,
pResultInfo
->
isNullRes
);
}
pBlock
->
info
.
rows
+=
pRow
->
numOfRows
;
releaseBufPage
(
pAggSup
->
pResultBuf
,
bufPage
);
}
blockDataUpdateTsWindow
(
pBlock
,
-
1
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
getNumCompactWindow
(
SArray
*
pWinInfos
,
int32_t
startIndex
,
int64_t
gap
)
{
SResultWindowInfo
*
pCurWin
=
taosArrayGet
(
pWinInfos
,
startIndex
);
int32_t
size
=
taosArrayGetSize
(
pWinInfos
);
...
...
@@ -2484,15 +2484,15 @@ int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap)
void
compactTimeWindow
(
SStreamSessionAggOperatorInfo
*
pInfo
,
int32_t
startIndex
,
int32_t
num
,
int32_t
groupId
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
,
SHashObj
*
pStUpdated
,
SHashObj
*
pStDeleted
)
{
SResultWindowInfo
*
pCurWin
=
taosArrayGet
(
pInfo
->
streamAggSup
.
p
ResultRow
s
,
startIndex
);
SResultWindowInfo
*
pCurWin
=
taosArrayGet
(
pInfo
->
streamAggSup
.
p
CurWin
s
,
startIndex
);
SResultRow
*
pCurResult
=
NULL
;
setWindowOutputBuf
(
pCurWin
,
&
pCurResult
,
pInfo
->
binfo
.
pCtx
,
groupId
,
numOfOutput
,
pInfo
->
binfo
.
rowCellInfoOffset
,
&
pInfo
->
streamAggSup
,
pTaskInfo
);
num
+=
startIndex
+
1
;
ASSERT
(
num
<=
taosArrayGetSize
(
pInfo
->
streamAggSup
.
p
ResultRow
s
));
ASSERT
(
num
<=
taosArrayGetSize
(
pInfo
->
streamAggSup
.
p
CurWin
s
));
// Just look for the window behind StartIndex
for
(
int32_t
i
=
startIndex
+
1
;
i
<
num
;
i
++
)
{
SResultWindowInfo
*
pWinInfo
=
taosArrayGet
(
pInfo
->
streamAggSup
.
p
ResultRow
s
,
i
);
SResultWindowInfo
*
pWinInfo
=
taosArrayGet
(
pInfo
->
streamAggSup
.
p
CurWin
s
,
i
);
SResultRow
*
pWinResult
=
NULL
;
setWindowOutputBuf
(
pWinInfo
,
&
pWinResult
,
pInfo
->
pDummyCtx
,
groupId
,
numOfOutput
,
pInfo
->
binfo
.
rowCellInfoOffset
,
&
pInfo
->
streamAggSup
,
pTaskInfo
);
...
...
@@ -2503,7 +2503,7 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
taosHashPut
(
pStDeleted
,
&
pWinInfo
->
pos
,
sizeof
(
SResultRowPosition
),
&
pWinInfo
->
win
.
skey
,
sizeof
(
TSKEY
));
pWinInfo
->
isOutput
=
false
;
}
taosArrayRemove
(
pInfo
->
streamAggSup
.
p
ResultRow
s
,
i
);
taosArrayRemove
(
pInfo
->
streamAggSup
.
p
CurWin
s
,
i
);
}
}
...
...
@@ -2533,7 +2533,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
SStreamAggSupporter
*
pAggSup
=
&
pInfo
->
streamAggSup
;
for
(
int32_t
i
=
0
;
i
<
pSDataBlock
->
info
.
rows
;)
{
int32_t
winIndex
=
0
;
SResultWindowInfo
*
pCurWin
=
getSessionTimeWindow
(
pAggSup
->
pResultRows
,
tsCols
[
i
]
,
gap
,
&
winIndex
);
SResultWindowInfo
*
pCurWin
=
getSessionTimeWindow
(
pAggSup
,
tsCols
[
i
],
pSDataBlock
->
info
.
groupId
,
gap
,
&
winIndex
);
winRows
=
updateSessionWindowInfo
(
pCurWin
,
tsCols
,
pSDataBlock
->
info
.
rows
,
i
,
pInfo
->
gap
,
pStDeleted
);
code
=
doOneWindowAgg
(
pInfo
,
pSDataBlock
,
pCurWin
,
&
pResult
,
i
,
winRows
,
numOfOutput
,
pTaskInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
...
...
@@ -2543,7 +2543,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
// doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
// forwardRows,
// pInfo->order, false);
int32_t
winNum
=
getNumCompactWindow
(
pAggSup
->
p
ResultRow
s
,
winIndex
,
gap
);
int32_t
winNum
=
getNumCompactWindow
(
pAggSup
->
p
CurWin
s
,
winIndex
,
gap
);
if
(
winNum
>
0
)
{
compactTimeWindow
(
pInfo
,
winIndex
,
winNum
,
groupId
,
numOfOutput
,
pTaskInfo
,
pStUpdated
,
pStDeleted
);
}
...
...
@@ -2566,7 +2566,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo*
int32_t
step
=
0
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
+=
step
)
{
int32_t
winIndex
=
0
;
SResultWindowInfo
*
pCurWin
=
getSessionTimeWindow
(
pAggSup
->
pResultRows
,
tsCols
[
i
]
,
gap
,
&
winIndex
);
SResultWindowInfo
*
pCurWin
=
getSessionTimeWindow
(
pAggSup
,
tsCols
[
i
],
pBlock
->
info
.
groupId
,
gap
,
&
winIndex
);
step
=
updateSessionWindowInfo
(
pCurWin
,
tsCols
,
pBlock
->
info
.
rows
,
i
,
gap
,
NULL
);
ASSERT
(
isInWindow
(
pCurWin
,
tsCols
[
i
],
gap
));
doClearWindowImpl
(
&
pCurWin
->
pos
,
pAggSup
->
pResultBuf
,
pBinfo
,
numOfOutput
);
...
...
@@ -2627,7 +2627,7 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
for
(
int32_t
j
=
0
;
j
<
numOfChildren
;
j
++
)
{
SOperatorInfo
*
pChild
=
taosArrayGetP
(
pInfo
->
pChildren
,
j
);
SStreamSessionAggOperatorInfo
*
pChInfo
=
pChild
->
info
;
SArray
*
pChWins
=
pChInfo
->
streamAggSup
.
pResultRows
;
SArray
*
pChWins
=
getWinInfos
(
&
pChInfo
->
streamAggSup
,
groupId
)
;
int32_t
chWinSize
=
taosArrayGetSize
(
pChWins
);
int32_t
index
=
binarySearch
(
pChWins
,
chWinSize
,
pParentWin
->
win
.
skey
,
TSDB_ORDER_DESC
,
getSessionWindowEndkey
);
for
(
int32_t
k
=
index
;
k
>
0
&&
k
<
chWinSize
;
k
++
)
{
...
...
@@ -2651,36 +2651,44 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*);
SResultWindowInfo
*
getSessionWinInfo
(
void
*
pData
)
{
return
(
SResultWindowInfo
*
)
pData
;
}
SResultWindowInfo
*
getStateWinInfo
(
void
*
pData
)
{
return
&
((
SStateWindowInfo
*
)
pData
)
->
winInfo
;
}
int32_t
closeSessionWindow
(
S
Array
*
pWins
,
STimeWindowAggSupp
*
pTwSup
,
SArray
*
pClosed
,
int32_t
closeSessionWindow
(
S
HashObj
*
pHashMap
,
STimeWindowAggSupp
*
pTwSup
,
SArray
*
pClosed
,
__get_win_info_
fn
)
{
// Todo(liuyao) save window to tdb
int32_t
size
=
taosArrayGetSize
(
pWins
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
void
*
pWin
=
taosArrayGet
(
pWins
,
i
);
SResultWindowInfo
*
pSeWin
=
fn
(
pWin
);
if
(
pSeWin
->
win
.
ekey
<
pTwSup
->
maxTs
-
pTwSup
->
waterMark
)
{
if
(
!
pSeWin
->
isClosed
)
{
pSeWin
->
isClosed
=
true
;
if
(
pTwSup
->
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
)
{
int32_t
code
=
saveResult
(
pSeWin
->
win
.
skey
,
pSeWin
->
pos
.
pageId
,
pSeWin
->
pos
.
offset
,
0
,
pClosed
);
pSeWin
->
isOutput
=
true
;
void
**
pIte
=
NULL
;
while
((
pIte
=
taosHashIterate
(
pHashMap
,
pIte
))
!=
NULL
)
{
SArray
*
pWins
=
(
SArray
*
)
(
*
pIte
);
int32_t
size
=
taosArrayGetSize
(
pWins
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
void
*
pWin
=
taosArrayGet
(
pWins
,
i
);
SResultWindowInfo
*
pSeWin
=
fn
(
pWin
);
if
(
pSeWin
->
win
.
ekey
<
pTwSup
->
maxTs
-
pTwSup
->
waterMark
)
{
if
(
!
pSeWin
->
isClosed
)
{
pSeWin
->
isClosed
=
true
;
if
(
pTwSup
->
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
)
{
int32_t
code
=
saveResult
(
pSeWin
->
win
.
skey
,
pSeWin
->
pos
.
pageId
,
pSeWin
->
pos
.
offset
,
0
,
pClosed
);
pSeWin
->
isOutput
=
true
;
}
}
continue
;
}
continue
;
break
;
}
break
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
getAllSessionWindow
(
SArray
*
pWins
,
SArray
*
pClosed
,
__get_win_info_
fn
)
{
int32_t
size
=
taosArrayGetSize
(
pWins
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
void
*
pWin
=
taosArrayGet
(
pWins
,
i
);
SResultWindowInfo
*
pSeWin
=
fn
(
pWin
);
if
(
!
pSeWin
->
isClosed
)
{
int32_t
code
=
saveResult
(
pSeWin
->
win
.
skey
,
pSeWin
->
pos
.
pageId
,
pSeWin
->
pos
.
offset
,
0
,
pClosed
);
pSeWin
->
isOutput
=
true
;
int32_t
getAllSessionWindow
(
SHashObj
*
pHashMap
,
SArray
*
pClosed
,
__get_win_info_
fn
)
{
void
**
pIte
=
NULL
;
while
((
pIte
=
taosHashIterate
(
pHashMap
,
pIte
))
!=
NULL
)
{
SArray
*
pWins
=
(
SArray
*
)
(
*
pIte
);
int32_t
size
=
taosArrayGetSize
(
pWins
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
void
*
pWin
=
taosArrayGet
(
pWins
,
i
);
SResultWindowInfo
*
pSeWin
=
fn
(
pWin
);
if
(
!
pSeWin
->
isClosed
)
{
int32_t
code
=
saveResult
(
pSeWin
->
win
.
skey
,
pSeWin
->
pos
.
pageId
,
pSeWin
->
pos
.
offset
,
0
,
pClosed
);
pSeWin
->
isOutput
=
true
;
}
}
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2714,8 +2722,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if
(
pBlock
==
NULL
)
{
break
;
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pBInfo
->
pCtx
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
if
(
pBlock
->
info
.
type
==
STREAM_REPROCESS
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SResultWindowInfo
));
doClearSessionWindows
(
&
pInfo
->
streamAggSup
,
&
pInfo
->
binfo
,
pBlock
,
0
,
pOperator
->
numOfExprs
,
pInfo
->
gap
,
pWins
);
...
...
@@ -2729,12 +2736,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
}
taosArrayDestroy
(
pWins
);
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
&&
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_MAX_DELAY
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
)
{
getAllSessionWindow
(
pInfo
->
streamAggSup
.
pResultRows
,
pUpdated
,
getSessionWinInfo
);
continue
;
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pBInfo
->
pCtx
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
if
(
isFinalSession
(
pInfo
))
{
int32_t
childIndex
=
0
;
// Todo(liuyao) get child id from SSDataBlock
SOptrBasicInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
childIndex
);
...
...
@@ -2873,7 +2881,9 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) {
return
pKeyData
&&
compareVal
(
pKeyData
,
&
pWin
->
stateKey
);
}
SStateWindowInfo
*
getStateWindowByTs
(
SArray
*
pWinInfos
,
TSKEY
ts
,
int32_t
*
pIndex
)
{
SStateWindowInfo
*
getStateWindowByTs
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
ts
,
uint64_t
groupId
,
int32_t
*
pIndex
)
{
SArray
*
pWinInfos
=
getWinInfos
(
pAggSup
,
groupId
);
pAggSup
->
pCurWins
=
pWinInfos
;
int32_t
size
=
taosArrayGetSize
(
pWinInfos
);
int32_t
index
=
binarySearch
(
pWinInfos
,
size
,
ts
,
TSDB_ORDER_DESC
,
getStateWinTsKey
);
SStateWindowInfo
*
pWin
=
NULL
;
...
...
@@ -2896,7 +2906,10 @@ SStateWindowInfo* getStateWindowByTs(SArray* pWinInfos, TSKEY ts, int32_t* pInde
return
NULL
;
}
SStateWindowInfo
*
getStateWindow
(
SArray
*
pWinInfos
,
TSKEY
ts
,
char
*
pKeyData
,
SColumn
*
pCol
,
int32_t
*
pIndex
)
{
SStateWindowInfo
*
getStateWindow
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
ts
,
uint64_t
groupId
,
char
*
pKeyData
,
SColumn
*
pCol
,
int32_t
*
pIndex
)
{
SArray
*
pWinInfos
=
getWinInfos
(
pAggSup
,
groupId
);
pAggSup
->
pCurWins
=
pWinInfos
;
int32_t
size
=
taosArrayGetSize
(
pWinInfos
);
if
(
size
==
0
)
{
*
pIndex
=
0
;
...
...
@@ -2987,16 +3000,16 @@ static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
+=
step
)
{
char
*
pKeyData
=
colDataGetData
(
pKeyColInfo
,
i
);
int32_t
winIndex
=
0
;
SStateWindowInfo
*
pCurWin
=
getStateWindowByTs
(
pAggSup
->
pResultRows
,
tsCol
[
i
]
,
&
winIndex
);
SStateWindowInfo
*
pCurWin
=
getStateWindowByTs
(
pAggSup
,
tsCol
[
i
],
pBlock
->
info
.
groupId
,
&
winIndex
);
if
(
!
pCurWin
)
{
continue
;
}
step
=
updateStateWindowInfo
(
pAggSup
->
p
ResultRow
s
,
winIndex
,
tsCol
,
pKeyColInfo
,
pBlock
->
info
.
rows
,
i
,
&
allEqual
,
step
=
updateStateWindowInfo
(
pAggSup
->
p
CurWin
s
,
winIndex
,
tsCol
,
pKeyColInfo
,
pBlock
->
info
.
rows
,
i
,
&
allEqual
,
pSeDeleted
);
ASSERT
(
isTsInWindow
(
pCurWin
,
tsCol
[
i
])
||
isEqualStateKey
(
pCurWin
,
pKeyData
));
taosArrayPush
(
pAggSup
->
pScanWindow
,
&
pCurWin
->
winInfo
.
win
);
taosHashRemove
(
pSeUpdated
,
&
pCurWin
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
));
deleteWindow
(
pAggSup
->
p
ResultRow
s
,
winIndex
);
deleteWindow
(
pAggSup
->
p
CurWin
s
,
winIndex
);
}
}
...
...
@@ -3026,13 +3039,15 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
char
*
pKeyData
=
colDataGetData
(
pKeyColInfo
,
i
);
int32_t
winIndex
=
0
;
bool
allEqual
=
true
;
SStateWindowInfo
*
pCurWin
=
getStateWindow
(
pAggSup
->
pResultRows
,
tsCols
[
i
],
pKeyData
,
&
pInfo
->
stateCol
,
&
winIndex
);
winRows
=
updateStateWindowInfo
(
pAggSup
->
pResultRows
,
winIndex
,
tsCols
,
pKeyColInfo
,
pSDataBlock
->
info
.
rows
,
i
,
&
allEqual
,
pInfo
->
pSeDeleted
);
SStateWindowInfo
*
pCurWin
=
getStateWindow
(
pAggSup
,
tsCols
[
i
],
pSDataBlock
->
info
.
groupId
,
pKeyData
,
&
pInfo
->
stateCol
,
&
winIndex
);
winRows
=
updateStateWindowInfo
(
pAggSup
->
pCurWins
,
winIndex
,
tsCols
,
pKeyColInfo
,
pSDataBlock
->
info
.
rows
,
i
,
&
allEqual
,
pInfo
->
pSeDeleted
);
if
(
!
allEqual
)
{
taosArrayPush
(
pAggSup
->
pScanWindow
,
&
pCurWin
->
winInfo
.
win
);
taosHashRemove
(
pSeUpdated
,
&
pCurWin
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
));
deleteWindow
(
pAggSup
->
p
ResultRow
s
,
winIndex
);
deleteWindow
(
pAggSup
->
p
CurWin
s
,
winIndex
);
continue
;
}
code
=
doOneStateWindowAgg
(
pInfo
,
pSDataBlock
,
&
pCurWin
->
winInfo
,
&
pResult
,
i
,
winRows
,
numOfOutput
,
pTaskInfo
);
...
...
@@ -3079,17 +3094,18 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if
(
pBlock
==
NULL
)
{
break
;
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pBInfo
->
pCtx
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
if
(
pBlock
->
info
.
type
==
STREAM_REPROCESS
)
{
doClearStateWindows
(
&
pInfo
->
streamAggSup
,
pBlock
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
stateCol
,
pInfo
->
stateCol
.
slotId
,
pSeUpdated
,
pInfo
->
pSeDeleted
);
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
&&
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_MAX_DELAY
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
)
{
getAllSessionWindow
(
pInfo
->
streamAggSup
.
pResultRows
,
pUpdated
,
getStateWinInfo
);
continue
;
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pBInfo
->
pCtx
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
doStreamStateAggImpl
(
pOperator
,
pBlock
,
pSeUpdated
,
pInfo
->
pSeDeleted
);
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
pBlock
->
info
.
window
.
ekey
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录