Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5a9d81b1
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
5a9d81b1
编写于
10月 20, 2022
作者:
H
Haojun Liao
提交者:
GitHub
10月 20, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #17536 from taosdata/fix/liao_cov
fix(query): fix no-fill expression calculation bug.
上级
c70006d9
bcb3dcc7
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
89 addition
and
90 deletion
+89
-90
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+10
-13
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+45
-54
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+1
-1
source/libs/executor/src/projectoperator.c
source/libs/executor/src/projectoperator.c
+2
-2
source/libs/executor/src/tfill.c
source/libs/executor/src/tfill.c
+17
-7
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+14
-13
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
5a9d81b1
...
...
@@ -677,22 +677,20 @@ typedef struct SFillOperatorInfo {
uint64_t
curGroupId
;
// current handled group id
SExprInfo
*
pExprInfo
;
int32_t
numOfExpr
;
SExprInfo
*
pNotFillExprInfo
;
int32_t
numOfNotFillExpr
;
SExprSupp
noFillExprSupp
;
}
SFillOperatorInfo
;
typedef
struct
SGroupbyOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SArray
*
pGroupCols
;
// group by columns, SArray<SColumn>
SArray
*
pGroupColVals
;
// current group column values, SArray<SGroupKeys>
SNode
*
pCondition
;
bool
isInit
;
// denote if current val is initialized or not
char
*
keyBuf
;
// group by keys for hash
int32_t
groupKeyLen
;
// total group by column width
SGroupResInfo
groupResInfo
;
SExprSupp
scalarSup
;
SArray
*
pGroupCols
;
// group by columns, SArray<SColumn>
SArray
*
pGroupColVals
;
// current group column values, SArray<SGroupKeys>
SNode
*
pCondition
;
bool
isInit
;
// denote if current val is initialized or not
char
*
keyBuf
;
// group by keys for hash
int32_t
groupKeyLen
;
// total group by column width
SGroupResInfo
groupResInfo
;
SExprSupp
scalarSup
;
}
SGroupbyOperatorInfo
;
typedef
struct
SDataGroupInfo
{
...
...
@@ -1016,8 +1014,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
int32_t
projectApplyFunctions
(
SExprInfo
*
pExpr
,
SSDataBlock
*
pResult
,
SSDataBlock
*
pSrcBlock
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SArray
*
pPseudoList
);
void
setInputDataBlock
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
);
void
setInputDataBlock
(
SExprSupp
*
pExprSupp
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
);
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
);
int32_t
checkForQueryBuf
(
size_t
numOfTables
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
5a9d81b1
...
...
@@ -403,25 +403,24 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfo
}
}
static
int32_t
doSetInputDataBlock
(
S
OperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
);
static
int32_t
doSetInputDataBlock
(
S
ExprSupp
*
pExprSup
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
);
static
void
doSetInputDataBlockInfo
(
S
OperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
)
{
for
(
int32_t
i
=
0
;
i
<
p
Operator
->
exprSupp
.
numOfExprs
;
++
i
)
{
static
void
doSetInputDataBlockInfo
(
S
ExprSupp
*
pExprSup
,
SSDataBlock
*
pBlock
,
int32_t
order
)
{
SqlFunctionCtx
*
pCtx
=
pExprSup
->
pCtx
;
for
(
int32_t
i
=
0
;
i
<
p
ExprSup
->
numOfExprs
;
++
i
)
{
pCtx
[
i
].
order
=
order
;
pCtx
[
i
].
input
.
numOfRows
=
pBlock
->
info
.
rows
;
setBlockSMAInfo
(
&
pCtx
[
i
],
&
p
Operator
->
exprSupp
.
pExprInfo
[
i
],
pBlock
);
setBlockSMAInfo
(
&
pCtx
[
i
],
&
p
ExprSup
->
pExprInfo
[
i
],
pBlock
);
pCtx
[
i
].
pSrcBlock
=
pBlock
;
}
}
void
setInputDataBlock
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
)
{
void
setInputDataBlock
(
SExprSupp
*
pExprSup
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
)
{
if
(
pBlock
->
pBlockAgg
!=
NULL
)
{
doSetInputDataBlockInfo
(
p
Operator
,
pCtx
,
pBlock
,
order
);
doSetInputDataBlockInfo
(
p
ExprSup
,
pBlock
,
order
);
}
else
{
doSetInputDataBlock
(
p
Operator
,
pCtx
,
pBlock
,
order
,
scanFlag
,
createDummyCol
);
doSetInputDataBlock
(
p
ExprSup
,
pBlock
,
order
,
scanFlag
,
createDummyCol
);
}
}
...
...
@@ -468,11 +467,12 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
doSetInputDataBlock
(
S
OperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
)
{
static
int32_t
doSetInputDataBlock
(
S
ExprSupp
*
pExprSup
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SqlFunctionCtx
*
pCtx
=
pExprSup
->
pCtx
;
for
(
int32_t
i
=
0
;
i
<
p
Operator
->
exprSupp
.
numOfExprs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
p
ExprSup
->
numOfExprs
;
++
i
)
{
pCtx
[
i
].
order
=
order
;
pCtx
[
i
].
input
.
numOfRows
=
pBlock
->
info
.
rows
;
...
...
@@ -483,7 +483,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
pInput
->
uid
=
pBlock
->
info
.
uid
;
pInput
->
colDataAggIsSet
=
false
;
SExprInfo
*
pOneExpr
=
&
p
Operator
->
exprSupp
.
pExprInfo
[
i
];
SExprInfo
*
pOneExpr
=
&
p
ExprSup
->
pExprInfo
[
i
];
for
(
int32_t
j
=
0
;
j
<
pOneExpr
->
base
.
numOfParams
;
++
j
)
{
SFunctParam
*
pFuncParam
=
&
pOneExpr
->
base
.
pParam
[
j
];
if
(
pFuncParam
->
type
==
FUNC_PARAM_TYPE_COLUMN
)
{
...
...
@@ -2435,7 +2435,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setExecutionContext
(
pOperator
,
pOperator
->
exprSupp
.
numOfExprs
,
pBlock
->
info
.
groupId
);
setInputDataBlock
(
p
Operator
,
pSup
->
pCtx
,
pBlock
,
order
,
scanFlag
,
true
);
setInputDataBlock
(
p
Sup
,
pBlock
,
order
,
scanFlag
,
true
);
code
=
doAggregateImpl
(
pOperator
,
pSup
->
pCtx
);
if
(
code
!=
0
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
...
...
@@ -2734,28 +2734,16 @@ static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOpera
static
void
doApplyScalarCalculation
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
)
{
SFillOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SSDataBlock
*
pResBlock
=
pInfo
->
pFinalRes
;
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
order
,
scanFlag
,
false
);
setInputDataBlock
(
pSup
,
pBlock
,
order
,
scanFlag
,
false
);
projectApplyFunctions
(
pSup
->
pExprInfo
,
pInfo
->
pRes
,
pBlock
,
pSup
->
pCtx
,
pSup
->
numOfExprs
,
NULL
);
pInfo
->
pRes
->
info
.
groupId
=
pBlock
->
info
.
groupId
;
SColumnInfoData
*
pDst
=
taosArrayGet
(
pInfo
->
pRes
->
pDataBlock
,
pInfo
->
primaryTsCol
);
SColumnInfoData
*
pSrc
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
primarySrcSlotId
);
colDataAssign
(
pDst
,
pSrc
,
pInfo
->
pRes
->
info
.
rows
,
&
pResBlock
->
info
);
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numOfNotFillExpr
;
++
i
)
{
SFillColInfo
*
pCol
=
&
pInfo
->
pFillInfo
->
pFillCol
[
i
+
pInfo
->
numOfExpr
];
ASSERT
(
pCol
->
notFillCol
);
SExprInfo
*
pExpr
=
pCol
->
pExpr
;
int32_t
srcSlotId
=
pExpr
->
base
.
pParam
[
0
].
pCol
->
slotId
;
int32_t
dstSlotId
=
pExpr
->
base
.
resSchema
.
slotId
;
// reset the row value before applying the no-fill functions to the input data block, which is "pBlock" in this case.
pInfo
->
pRes
->
info
.
rows
=
0
;
SExprSupp
*
pNoFillSupp
=
&
pInfo
->
noFillExprSupp
;
setInputDataBlock
(
pNoFillSupp
,
pBlock
,
order
,
scanFlag
,
false
);
SColumnInfoData
*
pDst1
=
taosArrayGet
(
pInfo
->
pRes
->
pDataBlock
,
dstSlotId
);
SColumnInfoData
*
pSrc1
=
taosArrayGet
(
pBlock
->
pDataBlock
,
srcSlotId
);
colDataAssign
(
pDst1
,
pSrc1
,
pInfo
->
pRes
->
info
.
rows
,
&
pResBlock
->
info
);
}
projectApplyFunctions
(
pNoFillSupp
->
pExprInfo
,
pInfo
->
pRes
,
pBlock
,
pNoFillSupp
->
pCtx
,
pNoFillSupp
->
numOfExprs
,
NULL
);
pInfo
->
pRes
->
info
.
groupId
=
pBlock
->
info
.
groupId
;
}
static
SSDataBlock
*
doFillImpl
(
SOperatorInfo
*
pOperator
)
{
...
...
@@ -3143,10 +3131,7 @@ void destroyFillOperatorInfo(void* param) {
pInfo
->
pRes
=
blockDataDestroy
(
pInfo
->
pRes
);
pInfo
->
pFinalRes
=
blockDataDestroy
(
pInfo
->
pFinalRes
);
if
(
pInfo
->
pNotFillExprInfo
!=
NULL
)
{
destroyExprInfo
(
pInfo
->
pNotFillExprInfo
,
pInfo
->
numOfNotFillExpr
);
taosMemoryFree
(
pInfo
->
pNotFillExprInfo
);
}
cleanupExprSupp
(
&
pInfo
->
noFillExprSupp
);
taosMemoryFreeClear
(
pInfo
->
p
);
taosArrayDestroy
(
pInfo
->
pColMatchColInfo
);
...
...
@@ -3211,11 +3196,12 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
}
static
bool
isWstartColumnExist
(
SFillOperatorInfo
*
pInfo
)
{
if
(
pInfo
->
n
umOfNotFillExpr
==
0
)
{
if
(
pInfo
->
n
oFillExprSupp
.
numOfExprs
==
0
)
{
return
false
;
}
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numOfNotFillExpr
;
++
i
)
{
SExprInfo
*
exprInfo
=
pInfo
->
pNotFillExprInfo
+
i
;
for
(
int32_t
i
=
0
;
i
<
pInfo
->
noFillExprSupp
.
numOfExprs
;
++
i
)
{
SExprInfo
*
exprInfo
=
pInfo
->
noFillExprSupp
.
pExprInfo
+
i
;
if
(
exprInfo
->
pExpr
->
nodeType
==
QUERY_NODE_COLUMN
&&
exprInfo
->
base
.
numOfParams
==
1
&&
exprInfo
->
base
.
pParam
[
0
].
pCol
->
colType
==
COLUMN_TYPE_WINDOW_START
)
{
return
true
;
...
...
@@ -3224,25 +3210,24 @@ static bool isWstartColumnExist(SFillOperatorInfo* pInfo) {
return
false
;
}
static
int32_t
createWStartTsAsNotFillExpr
(
SFillOperatorInfo
*
pInfo
,
SFillPhysiNode
*
pPhyFillNode
)
{
static
int32_t
createPrimaryTsExprIfNeeded
(
SFillOperatorInfo
*
pInfo
,
SFillPhysiNode
*
pPhyFillNode
,
SExprSupp
*
pExprSupp
,
const
char
*
idStr
)
{
bool
wstartExist
=
isWstartColumnExist
(
pInfo
);
if
(
wstartExist
==
false
)
{
if
(
pPhyFillNode
->
pWStartTs
->
type
!=
QUERY_NODE_TARGET
)
{
qError
(
"pWStartTs of fill physical node is not a target node
"
);
qError
(
"pWStartTs of fill physical node is not a target node
, %s"
,
idStr
);
return
TSDB_CODE_QRY_SYS_ERROR
;
}
SExprInfo
*
notFillExprs
=
taosMemoryRealloc
(
pInfo
->
pNotFillExprInfo
,
(
pInfo
->
numOfNotFillExpr
+
1
)
*
sizeof
(
SExprInfo
));
if
(
notFillExprs
==
NULL
)
{
SExprInfo
*
pExpr
=
taosMemoryRealloc
(
pExprSupp
->
pExprInfo
,
(
pExprSupp
->
numOfExprs
+
1
)
*
sizeof
(
SExprInfo
));
if
(
pExpr
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
createExprFromTargetNode
(
notFillExprs
+
pInfo
->
numOfNotFillExpr
,
(
STargetNode
*
)
pPhyFillNode
->
pWStartTs
);
++
pInfo
->
numOfNotFillExpr
;
pInfo
->
pNotFillExprInfo
=
notFillExprs
;
return
TSDB_CODE_SUCCESS
;
createExprFromTargetNode
(
&
pExpr
[
pExprSupp
->
numOfExprs
],
(
STargetNode
*
)
pPhyFillNode
->
pWStartTs
);
pExprSupp
->
numOfExprs
+=
1
;
pExprSupp
->
pExprInfo
=
pExpr
;
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -3260,8 +3245,14 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
SExprInfo
*
pExprInfo
=
createExprInfo
(
pPhyFillNode
->
pFillExprs
,
NULL
,
&
pInfo
->
numOfExpr
);
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pInfo
->
pNotFillExprInfo
=
createExprInfo
(
pPhyFillNode
->
pNotFillExprs
,
NULL
,
&
pInfo
->
numOfNotFillExpr
);
int32_t
code
=
createWStartTsAsNotFillExpr
(
pInfo
,
pPhyFillNode
);
SExprSupp
*
pNoFillSupp
=
&
pInfo
->
noFillExprSupp
;
pNoFillSupp
->
pExprInfo
=
createExprInfo
(
pPhyFillNode
->
pNotFillExprs
,
NULL
,
&
pNoFillSupp
->
numOfExprs
);
int32_t
code
=
createPrimaryTsExprIfNeeded
(
pInfo
,
pPhyFillNode
,
pNoFillSupp
,
pTaskInfo
->
id
.
str
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
code
=
initExprSupp
(
pNoFillSupp
,
pNoFillSupp
->
pExprInfo
,
pNoFillSupp
->
numOfExprs
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -3290,7 +3281,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
pInfo
->
pColMatchColInfo
=
extractColMatchInfo
(
pPhyFillNode
->
pFillExprs
,
pPhyFillNode
->
node
.
pOutputDataBlockDesc
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
);
code
=
initFillInfo
(
pInfo
,
pExprInfo
,
pInfo
->
numOfExpr
,
p
Info
->
pNotFillExprInfo
,
pInfo
->
numOfNotFillExpr
,
code
=
initFillInfo
(
pInfo
,
pExprInfo
,
pInfo
->
numOfExpr
,
p
NoFillSupp
->
pExprInfo
,
pNoFillSupp
->
numOfExprs
,
(
SNodeListNode
*
)
pPhyFillNode
->
pValues
,
pPhyFillNode
->
timeRange
,
pResultInfo
->
capacity
,
pTaskInfo
->
id
.
str
,
pInterval
,
type
,
order
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
5a9d81b1
...
...
@@ -360,7 +360,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pOperator
->
exprSupp
.
pCtx
,
pBlock
,
order
,
scanFlag
,
true
);
setInputDataBlock
(
&
pOperator
->
exprSupp
,
pBlock
,
order
,
scanFlag
,
true
);
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if
(
pInfo
->
scalarSup
.
pExprInfo
!=
NULL
)
{
...
...
source/libs/executor/src/projectoperator.c
浏览文件 @
5a9d81b1
...
...
@@ -285,7 +285,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
setInputDataBlock
(
p
Operator
,
pSup
->
pCtx
,
pBlock
,
order
,
scanFlag
,
false
);
setInputDataBlock
(
p
Sup
,
pBlock
,
order
,
scanFlag
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pInfo
->
pRes
->
info
.
rows
+
pBlock
->
info
.
rows
);
code
=
projectApplyFunctions
(
pSup
->
pExprInfo
,
pInfo
->
pRes
,
pBlock
,
pSup
->
pCtx
,
pSup
->
numOfExprs
,
...
...
@@ -446,7 +446,7 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp
}
}
setInputDataBlock
(
p
Operator
,
pSup
->
pCtx
,
pBlock
,
order
,
scanFlag
,
false
);
setInputDataBlock
(
p
Sup
,
pBlock
,
order
,
scanFlag
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pInfo
->
pRes
->
info
.
rows
+
pBlock
->
info
.
rows
);
code
=
projectApplyFunctions
(
pSup
->
pExprInfo
,
pInfo
->
pRes
,
pBlock
,
pSup
->
pCtx
,
pSup
->
numOfExprs
,
...
...
source/libs/executor/src/tfill.c
浏览文件 @
5a9d81b1
...
...
@@ -272,7 +272,17 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray
char
*
p
=
colDataGetData
(
pSrcCol
,
rowIndex
);
saveColData
(
pRow
,
i
,
p
,
isNull
);
}
else
if
(
type
==
QUERY_NODE_OPERATOR
)
{
SColumnInfoData
*
pSrcCol
=
taosArrayGet
(
pFillInfo
->
pSrcBlock
->
pDataBlock
,
i
);
int32_t
srcSlotId
=
GET_DEST_SLOT_ID
(
&
pFillInfo
->
pFillCol
[
i
]);
SColumnInfoData
*
pSrcCol
=
taosArrayGet
(
pFillInfo
->
pSrcBlock
->
pDataBlock
,
srcSlotId
);
bool
isNull
=
colDataIsNull_s
(
pSrcCol
,
rowIndex
);
char
*
p
=
colDataGetData
(
pSrcCol
,
rowIndex
);
saveColData
(
pRow
,
i
,
p
,
isNull
);
}
else
if
(
type
==
QUERY_NODE_FUNCTION
)
{
int32_t
srcSlotId
=
GET_DEST_SLOT_ID
(
&
pFillInfo
->
pFillCol
[
i
]);
SColumnInfoData
*
pSrcCol
=
taosArrayGet
(
pFillInfo
->
pSrcBlock
->
pDataBlock
,
srcSlotId
);
bool
isNull
=
colDataIsNull_s
(
pSrcCol
,
rowIndex
);
char
*
p
=
colDataGetData
(
pSrcCol
,
rowIndex
);
...
...
@@ -621,8 +631,8 @@ int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t ca
int64_t
getFillInfoStart
(
struct
SFillInfo
*
pFillInfo
)
{
return
pFillInfo
->
start
;
}
SFillColInfo
*
createFillColInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfFillExpr
,
SExprInfo
*
pNotFillExpr
,
int32_t
numOfNo
t
FillExpr
,
const
struct
SNodeListNode
*
pValNode
)
{
SFillColInfo
*
pFillCol
=
taosMemoryCalloc
(
numOfFillExpr
+
numOfNo
t
FillExpr
,
sizeof
(
SFillColInfo
));
int32_t
numOfNoFillExpr
,
const
struct
SNodeListNode
*
pValNode
)
{
SFillColInfo
*
pFillCol
=
taosMemoryCalloc
(
numOfFillExpr
+
numOfNoFillExpr
,
sizeof
(
SFillColInfo
));
if
(
pFillCol
==
NULL
)
{
return
NULL
;
}
...
...
@@ -643,7 +653,7 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn
}
}
for
(
int32_t
i
=
0
;
i
<
numOfNo
t
FillExpr
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfNoFillExpr
;
++
i
)
{
SExprInfo
*
pExprInfo
=
&
pNotFillExpr
[
i
];
pFillCol
[
i
+
numOfFillExpr
].
pExpr
=
pExprInfo
;
pFillCol
[
i
+
numOfFillExpr
].
notFillCol
=
true
;
...
...
@@ -1403,7 +1413,7 @@ static void doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBlock
blockDataCleanup
(
pDstBlock
);
blockDataEnsureCapacity
(
pDstBlock
,
pSrcBlock
->
info
.
rows
);
setInputDataBlock
(
p
Operator
,
pSup
->
pCtx
,
pSrcBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
false
);
setInputDataBlock
(
p
Sup
,
pSrcBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
false
);
projectApplyFunctions
(
pSup
->
pExprInfo
,
pDstBlock
,
pSrcBlock
,
pSup
->
pCtx
,
pSup
->
numOfExprs
,
NULL
);
pDstBlock
->
info
.
groupId
=
pSrcBlock
->
info
.
groupId
;
...
...
@@ -1553,8 +1563,8 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
}
pFillSup
->
numOfFillCols
=
numOfFillCols
;
int32_t
numOfNotFillCols
=
0
;
SExprInfo
*
pNot
FillExprInfo
=
createExprInfo
(
pPhyFillNode
->
pNotFillExprs
,
NULL
,
&
numOfNotFillCols
);
pFillSup
->
pAllColInfo
=
createFillColInfo
(
pFillExprInfo
,
pFillSup
->
numOfFillCols
,
pNot
FillExprInfo
,
numOfNotFillCols
,
SExprInfo
*
no
FillExprInfo
=
createExprInfo
(
pPhyFillNode
->
pNotFillExprs
,
NULL
,
&
numOfNotFillCols
);
pFillSup
->
pAllColInfo
=
createFillColInfo
(
pFillExprInfo
,
pFillSup
->
numOfFillCols
,
no
FillExprInfo
,
numOfNotFillCols
,
(
const
SNodeListNode
*
)(
pPhyFillNode
->
pValues
));
pFillSup
->
type
=
convertFillType
(
pPhyFillNode
->
mode
);
pFillSup
->
numOfAllCols
=
pFillSup
->
numOfFillCols
+
numOfNotFillCols
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
5a9d81b1
...
...
@@ -1079,7 +1079,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
p
Operator
,
pSup
->
pCtx
,
pBlock
,
pInfo
->
inputOrder
,
scanFlag
,
true
);
setInputDataBlock
(
p
Sup
,
pBlock
,
pInfo
->
inputOrder
,
scanFlag
,
true
);
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
primaryTsIndex
);
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
);
...
...
@@ -1209,7 +1209,7 @@ static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
break
;
}
setInputDataBlock
(
p
Operator
,
pSup
->
pCtx
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
setInputDataBlock
(
p
Sup
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
tsSlotId
);
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
...
...
@@ -1944,7 +1944,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
p
Operator
,
pSup
->
pCtx
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
setInputDataBlock
(
p
Sup
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
tsSlotId
);
doSessionWindowAggImpl
(
pOperator
,
pInfo
,
pBlock
);
...
...
@@ -2295,7 +2295,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
p
Operator
,
pSup
->
pCtx
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
setInputDataBlock
(
p
Sup
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
SColumnInfoData
*
pTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pSliceInfo
->
tsCol
.
slotId
);
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
...
...
@@ -3268,7 +3268,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SExprSupp
*
pExprSup
=
&
pInfo
->
scalarSupp
;
projectApplyFunctions
(
pExprSup
->
pExprInfo
,
pBlock
,
pBlock
,
pExprSup
->
pCtx
,
pExprSup
->
numOfExprs
,
NULL
);
}
setInputDataBlock
(
p
Operator
,
pSup
->
pCtx
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
setInputDataBlock
(
p
Sup
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
doStreamIntervalAggImpl
(
pOperator
,
pBlock
,
pBlock
->
info
.
groupId
,
pUpdatedMap
);
if
(
IS_FINAL_OP
(
pInfo
))
{
int32_t
chIndex
=
getChildIndex
(
pBlock
);
...
...
@@ -3286,7 +3286,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
chIndex
);
SStreamIntervalOperatorInfo
*
pChInfo
=
pChildOp
->
info
;
setInputDataBlock
(
pChildOp
,
pChildOp
->
exprSupp
.
pCtx
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
setInputDataBlock
(
&
pChildOp
->
exprSupp
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
doStreamIntervalAggImpl
(
pChildOp
,
pBlock
,
pBlock
->
info
.
groupId
,
NULL
);
}
maxTs
=
TMAX
(
maxTs
,
pBlock
->
info
.
window
.
ekey
);
...
...
@@ -4120,7 +4120,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SExprSupp
*
pExprSup
=
&
pInfo
->
scalarSupp
;
projectApplyFunctions
(
pExprSup
->
pExprInfo
,
pBlock
,
pBlock
,
pExprSup
->
pCtx
,
pExprSup
->
numOfExprs
,
NULL
);
}
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pSup
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
doStreamSessionAggImpl
(
pOperator
,
pBlock
,
pStUpdated
,
pInfo
->
pStDeleted
,
IS_FINAL_OP
(
pInfo
));
if
(
IS_FINAL_OP
(
pInfo
))
{
int32_t
chIndex
=
getChildIndex
(
pBlock
);
...
...
@@ -4135,7 +4136,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
taosArrayPush
(
pInfo
->
pChildren
,
&
pChildOp
);
}
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
chIndex
);
setInputDataBlock
(
pChildOp
,
pChildOp
->
exprSupp
.
pCtx
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
setInputDataBlock
(
&
pChildOp
->
exprSupp
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
doStreamSessionAggImpl
(
pChildOp
,
pBlock
,
NULL
,
NULL
,
true
);
}
maxTs
=
TMAX
(
maxTs
,
pBlock
->
info
.
window
.
ekey
);
...
...
@@ -4334,7 +4335,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
projectApplyFunctions
(
pExprSup
->
pExprInfo
,
pBlock
,
pBlock
,
pExprSup
->
pCtx
,
pExprSup
->
numOfExprs
,
NULL
);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
p
Operator
,
pSup
->
pCtx
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
setInputDataBlock
(
p
Sup
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
doStreamSessionAggImpl
(
pOperator
,
pBlock
,
pStUpdated
,
NULL
,
false
);
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
pBlock
->
info
.
window
.
ekey
);
}
...
...
@@ -4647,7 +4648,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
projectApplyFunctions
(
pExprSup
->
pExprInfo
,
pBlock
,
pBlock
,
pExprSup
->
pCtx
,
pExprSup
->
numOfExprs
,
NULL
);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
p
Operator
,
pSup
->
pCtx
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
setInputDataBlock
(
p
Sup
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
doStreamStateAggImpl
(
pOperator
,
pBlock
,
pSeUpdated
,
pInfo
->
pSeDeleted
);
maxTs
=
TMAX
(
maxTs
,
pBlock
->
info
.
window
.
ekey
);
}
...
...
@@ -4917,7 +4918,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
}
getTableScanInfo
(
pOperator
,
&
pIaInfo
->
inputOrder
,
&
scanFlag
);
setInputDataBlock
(
p
Operator
,
pSup
->
pCtx
,
pBlock
,
pIaInfo
->
inputOrder
,
scanFlag
,
true
);
setInputDataBlock
(
p
Sup
,
pBlock
,
pIaInfo
->
inputOrder
,
scanFlag
,
true
);
doMergeAlignedIntervalAggImpl
(
pOperator
,
&
pIaInfo
->
binfo
.
resultRowInfo
,
pBlock
,
pRes
);
doFilter
(
pMiaInfo
->
pCondition
,
pRes
,
NULL
,
NULL
);
...
...
@@ -5246,7 +5247,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
}
getTableScanInfo
(
pOperator
,
&
iaInfo
->
inputOrder
,
&
scanFlag
);
setInputDataBlock
(
p
Operator
,
pExpSupp
->
pCtx
,
pBlock
,
iaInfo
->
inputOrder
,
scanFlag
,
true
);
setInputDataBlock
(
p
ExpSupp
,
pBlock
,
iaInfo
->
inputOrder
,
scanFlag
,
true
);
doMergeIntervalAggImpl
(
pOperator
,
&
iaInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
pRes
);
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
...
...
@@ -5425,7 +5426,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
// The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the
// caller. Note that all the time window are not close till now.
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
p
Operator
,
pSup
->
pCtx
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
setInputDataBlock
(
p
Sup
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
if
(
pInfo
->
invertible
)
{
setInverFunction
(
pSup
->
pCtx
,
pOperator
->
exprSupp
.
numOfExprs
,
pBlock
->
info
.
type
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录