Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d37d4e5b
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d37d4e5b
编写于
5月 12, 2022
作者:
S
slzhou@taodata.com
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
error processing of executor when calling aggregate function
上级
c8b05030
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
47 addition
and
30 deletion
+47
-30
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-2
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+28
-11
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+4
-4
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+13
-13
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
d37d4e5b
...
...
@@ -616,10 +616,10 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
int32_t
initAggInfo
(
SOptrBasicInfo
*
pBasicInfo
,
SAggSupporter
*
pAggSup
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
size_t
keyBufSize
,
const
char
*
pkey
);
void
initResultSizeInfo
(
SOperatorInfo
*
pOperator
,
int32_t
numOfRows
);
void
doBuildResultDatablock
(
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
);
void
doBuildResultDatablock
(
S
ExecTaskInfo
*
taskInfo
,
S
OptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
);
void
finalizeMultiTupleQueryResult
(
int32_t
numOfOutput
,
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
*
rowCellInfoOffset
);
void
doApplyFunctions
(
SqlFunctionCtx
*
pCtx
,
STimeWindow
*
pWin
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
void
doApplyFunctions
(
S
ExecTaskInfo
*
taskInfo
,
S
qlFunctionCtx
*
pCtx
,
STimeWindow
*
pWin
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
int32_t
forwardStep
,
TSKEY
*
tsCol
,
int32_t
numOfTotal
,
int32_t
numOfOutput
,
int32_t
order
);
int32_t
setGroupResultOutputBuf
(
SOptrBasicInfo
*
binfo
,
int32_t
numOfCols
,
char
*
pData
,
int16_t
type
,
int16_t
bytes
,
int32_t
groupId
,
SDiskbasedBuf
*
pBuf
,
SExecTaskInfo
*
pTaskInfo
,
SAggSupporter
*
pAggSup
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
d37d4e5b
...
...
@@ -155,7 +155,7 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
void
operatorDummyCloseFn
(
void
*
param
,
int32_t
numOfCols
)
{}
static
int32_t
doCopyToSDataBlock
(
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
static
int32_t
doCopyToSDataBlock
(
S
ExecTaskInfo
*
taskInfo
,
S
SDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
int32_t
orderType
,
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
);
...
...
@@ -579,7 +579,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
colDataAppendInt64
(
pColData
,
4
,
&
pQueryWindow
->
ekey
);
}
void
doApplyFunctions
(
SqlFunctionCtx
*
pCtx
,
STimeWindow
*
pWin
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
void
doApplyFunctions
(
S
ExecTaskInfo
*
taskInfo
,
S
qlFunctionCtx
*
pCtx
,
STimeWindow
*
pWin
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
int32_t
forwardStep
,
TSKEY
*
tsCol
,
int32_t
numOfTotal
,
int32_t
numOfOutput
,
int32_t
order
)
{
for
(
int32_t
k
=
0
;
k
<
numOfOutput
;
++
k
)
{
pCtx
[
k
].
startTs
=
pWin
->
skey
;
...
...
@@ -618,9 +618,14 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData*
pEntryInfo
->
numOfRes
=
1
;
continue
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
functionNeedToExecute
(
&
pCtx
[
k
])
&&
pCtx
[
k
].
fpSet
.
process
!=
NULL
)
{
pCtx
[
k
].
fpSet
.
process
(
&
pCtx
[
k
]);
code
=
pCtx
[
k
].
fpSet
.
process
(
&
pCtx
[
k
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"%s apply functions error, code: %s"
,
GET_TASKID
(
taskInfo
),
tstrerror
(
code
));
taskInfo
->
code
=
code
;
longjmp
(
taskInfo
->
env
,
code
);
}
}
// restore it
...
...
@@ -806,7 +811,13 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction
// this can be set during create the struct
// todo add a dummy funtion to avoid process check
if
(
pCtx
[
k
].
fpSet
.
process
!=
NULL
)
{
pCtx
[
k
].
fpSet
.
process
(
&
pCtx
[
k
]);
int32_t
code
=
pCtx
[
k
].
fpSet
.
process
(
&
pCtx
[
k
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"%s call aggregate function error happens, code : %s"
,
GET_TASKID
(
pOperator
->
pTaskInfo
),
tstrerror
(
code
));
pOperator
->
pTaskInfo
->
code
=
code
;
longjmp
(
pOperator
->
pTaskInfo
->
env
,
code
);
}
}
}
}
...
...
@@ -2176,7 +2187,7 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
* @param pQInfo
* @param result
*/
int32_t
doCopyToSDataBlock
(
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
int32_t
doCopyToSDataBlock
(
S
ExecTaskInfo
*
taskInfo
,
S
SDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
int32_t
orderType
,
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfRows
=
getNumOfTotalRes
(
pGroupResInfo
);
int32_t
numOfResult
=
pBlock
->
info
.
rows
;
// there are already exists result rows
...
...
@@ -2215,8 +2226,14 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
int32_t
slotId
=
pExprInfo
[
j
].
base
.
resSchema
.
slotId
;
pCtx
[
j
].
resultInfo
=
getResultCell
(
pRow
,
j
,
rowCellOffset
);
if
(
pCtx
[
j
].
fpSet
.
process
)
{
pCtx
[
j
].
fpSet
.
finalize
(
&
pCtx
[
j
],
pBlock
);
if
(
pCtx
[
j
].
fpSet
.
finalize
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
code
=
pCtx
[
j
].
fpSet
.
finalize
(
&
pCtx
[
j
],
pBlock
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"%s build result data block error, code %s"
,
GET_TASKID
(
taskInfo
),
tstrerror
(
code
));
taskInfo
->
code
=
code
;
longjmp
(
taskInfo
->
env
,
code
);
}
}
else
if
(
strcmp
(
pCtx
[
j
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_select_value"
)
==
0
)
{
// do nothing, todo refactor
}
else
{
...
...
@@ -2243,7 +2260,7 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
return
0
;
}
void
doBuildResultDatablock
(
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SExprInfo
*
pExprInfo
,
void
doBuildResultDatablock
(
S
ExecTaskInfo
*
taskInfo
,
S
OptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
)
{
assert
(
pGroupResInfo
->
currentGroup
<=
pGroupResInfo
->
totalGroup
);
...
...
@@ -2257,7 +2274,7 @@ void doBuildResultDatablock(SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo
}
int32_t
orderType
=
TSDB_ORDER_ASC
;
doCopyToSDataBlock
(
pBlock
,
pExprInfo
,
pBuf
,
pGroupResInfo
,
orderType
,
rowCellOffset
,
pCtx
);
doCopyToSDataBlock
(
taskInfo
,
pBlock
,
pExprInfo
,
pBuf
,
pGroupResInfo
,
orderType
,
rowCellOffset
,
pCtx
);
// add condition (pBlock->info.rows >= 1) just to runtime happy
blockDataUpdateTsWindow
(
pBlock
);
...
...
@@ -3749,7 +3766,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
}
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
doBuildResultDatablock
(
pInfo
,
&
pAggInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pAggInfo
->
aggSup
.
pResultBuf
);
doBuildResultDatablock
(
p
TaskInfo
,
p
Info
,
&
pAggInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pAggInfo
->
aggSup
.
pResultBuf
);
if
(
pInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainDataInCurrentGroup
(
&
pAggInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
}
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
d37d4e5b
...
...
@@ -234,7 +234,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
int32_t
rowIndex
=
j
-
num
;
doApplyFunctions
(
pCtx
,
&
w
,
NULL
,
rowIndex
,
num
,
NULL
,
pBlock
->
info
.
rows
,
pOperator
->
numOfExprs
,
TSDB_ORDER_ASC
);
doApplyFunctions
(
p
TaskInfo
,
p
Ctx
,
&
w
,
NULL
,
rowIndex
,
num
,
NULL
,
pBlock
->
info
.
rows
,
pOperator
->
numOfExprs
,
TSDB_ORDER_ASC
);
// assign the group keys or user input constant values if required
doAssignGroupKeys
(
pCtx
,
pOperator
->
numOfExprs
,
pBlock
->
info
.
rows
,
rowIndex
);
...
...
@@ -252,7 +252,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
int32_t
rowIndex
=
pBlock
->
info
.
rows
-
num
;
doApplyFunctions
(
pCtx
,
&
w
,
NULL
,
rowIndex
,
num
,
NULL
,
pBlock
->
info
.
rows
,
pOperator
->
numOfExprs
,
TSDB_ORDER_ASC
);
doApplyFunctions
(
p
TaskInfo
,
p
Ctx
,
&
w
,
NULL
,
rowIndex
,
num
,
NULL
,
pBlock
->
info
.
rows
,
pOperator
->
numOfExprs
,
TSDB_ORDER_ASC
);
doAssignGroupKeys
(
pCtx
,
pOperator
->
numOfExprs
,
pBlock
->
info
.
rows
,
rowIndex
);
}
}
...
...
@@ -268,7 +268,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
SSDataBlock
*
pRes
=
pInfo
->
binfo
.
pRes
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
doBuildResultDatablock
(
pTaskInfo
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pRes
->
info
.
rows
==
0
||
!
hasRemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
...
...
@@ -317,7 +317,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
false
);
while
(
1
)
{
doBuildResultDatablock
(
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
doBuildResultDatablock
(
pTaskInfo
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pRes
);
bool
hasRemain
=
hasRemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
d37d4e5b
...
...
@@ -703,7 +703,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
pInfo
->
order
,
false
);
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
win
,
true
);
doApplyFunctions
(
pInfo
->
binfo
.
pCtx
,
&
win
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardStep
,
tsCols
,
doApplyFunctions
(
p
TaskInfo
,
p
Info
->
binfo
.
pCtx
,
&
win
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardStep
,
tsCols
,
pSDataBlock
->
info
.
rows
,
numOfOutput
,
TSDB_ORDER_ASC
);
STimeWindow
nextWin
=
win
;
...
...
@@ -740,7 +740,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
pInfo
->
order
,
false
);
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
nextWin
,
true
);
doApplyFunctions
(
pInfo
->
binfo
.
pCtx
,
&
nextWin
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardStep
,
tsCols
,
doApplyFunctions
(
p
TaskInfo
,
p
Info
->
binfo
.
pCtx
,
&
nextWin
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardStep
,
tsCols
,
pSDataBlock
->
info
.
rows
,
numOfOutput
,
TSDB_ORDER_ASC
);
}
...
...
@@ -855,7 +855,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
}
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
window
,
false
);
doApplyFunctions
(
pInfo
->
binfo
.
pCtx
,
&
window
,
&
pInfo
->
twAggSup
.
timeWindowData
,
pRowSup
->
startRowIndex
,
doApplyFunctions
(
p
TaskInfo
,
p
Info
->
binfo
.
pCtx
,
&
window
,
&
pInfo
->
twAggSup
.
timeWindowData
,
pRowSup
->
startRowIndex
,
pRowSup
->
numOfRows
,
NULL
,
pBlock
->
info
.
rows
,
numOfOutput
,
TSDB_ORDER_ASC
);
// here we start a new session window
...
...
@@ -874,7 +874,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
}
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pRowSup
->
win
,
false
);
doApplyFunctions
(
pInfo
->
binfo
.
pCtx
,
&
pRowSup
->
win
,
&
pInfo
->
twAggSup
.
timeWindowData
,
pRowSup
->
startRowIndex
,
doApplyFunctions
(
p
TaskInfo
,
p
Info
->
binfo
.
pCtx
,
&
pRowSup
->
win
,
&
pInfo
->
twAggSup
.
timeWindowData
,
pRowSup
->
startRowIndex
,
pRowSup
->
numOfRows
,
NULL
,
pBlock
->
info
.
rows
,
numOfOutput
,
TSDB_ORDER_ASC
);
}
...
...
@@ -888,7 +888,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
pBInfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
doBuildResultDatablock
(
p
TaskInfo
,
p
BInfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
return
NULL
;
...
...
@@ -921,7 +921,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
true
);
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
doBuildResultDatablock
(
pBInfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
doBuildResultDatablock
(
p
TaskInfo
,
p
BInfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
}
...
...
@@ -948,7 +948,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
}
blockDataEnsureCapacity
(
pBlock
,
pOperator
->
resultInfo
.
capacity
);
doBuildResultDatablock
(
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
doBuildResultDatablock
(
pTaskInfo
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pBlock
->
info
.
rows
==
0
||
!
hasRemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
...
...
@@ -998,7 +998,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
doBuildResultDatablock
(
pOperator
->
pTaskInfo
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
hasRemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
...
...
@@ -1035,7 +1035,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
initMultiResInfoFromArrayList
(
&
pInfo
->
groupResInfo
,
pUpdated
);
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
doBuildResultDatablock
(
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
doBuildResultDatablock
(
pOperator
->
pTaskInfo
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
// TODO: remove for stream
/*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/
...
...
@@ -1233,7 +1233,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
// pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
window
,
false
);
doApplyFunctions
(
pInfo
->
binfo
.
pCtx
,
&
window
,
&
pInfo
->
twAggSup
.
timeWindowData
,
pRowSup
->
startRowIndex
,
doApplyFunctions
(
p
TaskInfo
,
p
Info
->
binfo
.
pCtx
,
&
window
,
&
pInfo
->
twAggSup
.
timeWindowData
,
pRowSup
->
startRowIndex
,
pRowSup
->
numOfRows
,
NULL
,
pBlock
->
info
.
rows
,
numOfOutput
,
TSDB_ORDER_ASC
);
// here we start a new session window
...
...
@@ -1252,7 +1252,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
}
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pRowSup
->
win
,
false
);
doApplyFunctions
(
pInfo
->
binfo
.
pCtx
,
&
pRowSup
->
win
,
&
pInfo
->
twAggSup
.
timeWindowData
,
pRowSup
->
startRowIndex
,
doApplyFunctions
(
p
TaskInfo
,
p
Info
->
binfo
.
pCtx
,
&
pRowSup
->
win
,
&
pInfo
->
twAggSup
.
timeWindowData
,
pRowSup
->
startRowIndex
,
pRowSup
->
numOfRows
,
NULL
,
pBlock
->
info
.
rows
,
numOfOutput
,
TSDB_ORDER_ASC
);
}
...
...
@@ -1265,7 +1265,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
pBInfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
doBuildResultDatablock
(
p
Operator
->
pTaskInfo
,
p
BInfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
return
NULL
;
...
...
@@ -1298,7 +1298,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
true
);
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
doBuildResultDatablock
(
pBInfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
doBuildResultDatablock
(
p
Operator
->
pTaskInfo
,
p
BInfo
,
&
pInfo
->
groupResInfo
,
pOperator
->
pExpr
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainDataInCurrentGroup
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录