Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
afe03dc9
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
afe03dc9
编写于
1月 13, 2022
作者:
M
Minglei Jin
提交者:
GitHub
1月 13, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9738 from taosdata/fix/TS-1029-M
[TS-1029]<fix>(query): added some memory allocation checks
上级
d688cc17
584a0bd4
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
599 addition
and
37 deletion
+599
-37
src/client/src/tscServer.c
src/client/src/tscServer.c
+7
-0
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+4
-1
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+580
-36
src/query/src/qFill.c
src/query/src/qFill.c
+8
-0
未找到文件。
src/client/src/tscServer.c
浏览文件 @
afe03dc9
...
@@ -2041,6 +2041,13 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) {
...
@@ -2041,6 +2041,13 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) {
tscDebug
(
"0x%"
PRIx64
" create QInfo 0x%"
PRIx64
" to execute query processing"
,
pSql
->
self
,
pSql
->
self
);
tscDebug
(
"0x%"
PRIx64
" create QInfo 0x%"
PRIx64
" to execute query processing"
,
pSql
->
self
,
pSql
->
self
);
pQueryInfo
->
pQInfo
=
createQInfoFromQueryNode
(
pQueryInfo
,
&
tableGroupInfo
,
NULL
,
NULL
,
pRes
->
pMerger
,
MERGE_STAGE
,
pSql
->
self
);
pQueryInfo
->
pQInfo
=
createQInfoFromQueryNode
(
pQueryInfo
,
&
tableGroupInfo
,
NULL
,
NULL
,
pRes
->
pMerger
,
MERGE_STAGE
,
pSql
->
self
);
if
(
pQueryInfo
->
pQInfo
==
NULL
)
{
taosHashCleanup
(
tableGroupInfo
.
map
);
taosArrayDestroy
(
group
);
tscAsyncResultOnError
(
pSql
);
pRes
->
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
pRes
->
code
;
}
}
}
uint64_t
localQueryId
=
pSql
->
self
;
uint64_t
localQueryId
=
pSql
->
self
;
...
...
src/client/src/tscSubquery.c
浏览文件 @
afe03dc9
...
@@ -3873,8 +3873,11 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr
...
@@ -3873,8 +3873,11 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr
STsBufInfo
bufInfo
=
{
0
};
STsBufInfo
bufInfo
=
{
0
};
SQueryParam
param
=
{.
pOperator
=
pa
};
SQueryParam
param
=
{.
pOperator
=
pa
};
/*int32_t code = */
initQInfo
(
&
bufInfo
,
NULL
,
pSourceOperator
,
pQInfo
,
&
param
,
NULL
,
0
,
merger
);
int32_t
code
=
initQInfo
(
&
bufInfo
,
NULL
,
pSourceOperator
,
pQInfo
,
&
param
,
NULL
,
0
,
merger
);
taosArrayDestroy
(
pa
);
taosArrayDestroy
(
pa
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_cleanup
;
}
return
pQInfo
;
return
pQInfo
;
...
...
src/query/src/qExecutor.c
浏览文件 @
afe03dc9
...
@@ -321,9 +321,17 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
...
@@ -321,9 +321,17 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
const
static
int32_t
minSize
=
8
;
const
static
int32_t
minSize
=
8
;
SSDataBlock
*
res
=
calloc
(
1
,
sizeof
(
SSDataBlock
));
SSDataBlock
*
res
=
calloc
(
1
,
sizeof
(
SSDataBlock
));
res
->
info
.
numOfCols
=
numOfOutput
;
if
(
res
==
NULL
)
{
qError
(
"failed to allocate for output buffer"
);
goto
_clean
;
}
res
->
pDataBlock
=
taosArrayInit
(
numOfOutput
,
sizeof
(
SColumnInfoData
));
res
->
pDataBlock
=
taosArrayInit
(
numOfOutput
,
sizeof
(
SColumnInfoData
));
if
(
res
->
pDataBlock
==
NULL
)
{
qError
(
"failed to init arrary for data block of output buffer"
);
goto
_clean
;
}
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
SColumnInfoData
idata
=
{{
0
}};
SColumnInfoData
idata
=
{{
0
}};
idata
.
info
.
type
=
pExpr
[
i
].
base
.
resType
;
idata
.
info
.
type
=
pExpr
[
i
].
base
.
resType
;
...
@@ -332,10 +340,20 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
...
@@ -332,10 +340,20 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
int32_t
size
=
MAX
(
idata
.
info
.
bytes
*
numOfRows
,
minSize
);
int32_t
size
=
MAX
(
idata
.
info
.
bytes
*
numOfRows
,
minSize
);
idata
.
pData
=
calloc
(
1
,
size
);
// at least to hold a pointer on x64 platform
idata
.
pData
=
calloc
(
1
,
size
);
// at least to hold a pointer on x64 platform
if
(
idata
.
pData
==
NULL
)
{
qError
(
"failed to allocate column buffer for output buffer"
);
goto
_clean
;
}
taosArrayPush
(
res
->
pDataBlock
,
&
idata
);
taosArrayPush
(
res
->
pDataBlock
,
&
idata
);
res
->
info
.
numOfCols
++
;
}
}
return
res
;
return
res
;
_clean:
destroyOutputBuf
(
res
);
return
NULL
;
}
}
void
*
destroyOutputBuf
(
SSDataBlock
*
pBlock
)
{
void
*
destroyOutputBuf
(
SSDataBlock
*
pBlock
)
{
...
@@ -2182,23 +2200,35 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
...
@@ -2182,23 +2200,35 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
switch
(
*
op
)
{
switch
(
*
op
)
{
case
OP_TagScan
:
{
case
OP_TagScan
:
{
pRuntimeEnv
->
proot
=
createTagScanOperatorInfo
(
pRuntimeEnv
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
pRuntimeEnv
->
proot
=
createTagScanOperatorInfo
(
pRuntimeEnv
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
break
;
break
;
}
}
case
OP_MultiTableTimeInterval
:
{
case
OP_MultiTableTimeInterval
:
{
pRuntimeEnv
->
proot
=
pRuntimeEnv
->
proot
=
createMultiTableTimeIntervalOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
createMultiTableTimeIntervalOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
setTableScanFilterOperatorInfo
(
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
info
,
pRuntimeEnv
->
proot
);
setTableScanFilterOperatorInfo
(
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
info
,
pRuntimeEnv
->
proot
);
break
;
break
;
}
}
case
OP_AllMultiTableTimeInterval
:
{
case
OP_AllMultiTableTimeInterval
:
{
pRuntimeEnv
->
proot
=
pRuntimeEnv
->
proot
=
createAllMultiTableTimeIntervalOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
createAllMultiTableTimeIntervalOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
setTableScanFilterOperatorInfo
(
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
info
,
pRuntimeEnv
->
proot
);
setTableScanFilterOperatorInfo
(
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
info
,
pRuntimeEnv
->
proot
);
break
;
break
;
}
}
case
OP_TimeWindow
:
{
case
OP_TimeWindow
:
{
pRuntimeEnv
->
proot
=
pRuntimeEnv
->
proot
=
createTimeIntervalOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
createTimeIntervalOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
int32_t
opType
=
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
operatorType
;
int32_t
opType
=
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
operatorType
;
if
(
opType
!=
OP_DummyInput
&&
opType
!=
OP_Join
)
{
if
(
opType
!=
OP_DummyInput
&&
opType
!=
OP_Join
)
{
setTableScanFilterOperatorInfo
(
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
info
,
pRuntimeEnv
->
proot
);
setTableScanFilterOperatorInfo
(
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
info
,
pRuntimeEnv
->
proot
);
...
@@ -2208,6 +2238,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
...
@@ -2208,6 +2238,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case
OP_AllTimeWindow
:
{
case
OP_AllTimeWindow
:
{
pRuntimeEnv
->
proot
=
pRuntimeEnv
->
proot
=
createAllTimeIntervalOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
createAllTimeIntervalOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
int32_t
opType
=
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
operatorType
;
int32_t
opType
=
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
operatorType
;
if
(
opType
!=
OP_DummyInput
&&
opType
!=
OP_Join
)
{
if
(
opType
!=
OP_DummyInput
&&
opType
!=
OP_Join
)
{
setTableScanFilterOperatorInfo
(
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
info
,
pRuntimeEnv
->
proot
);
setTableScanFilterOperatorInfo
(
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
info
,
pRuntimeEnv
->
proot
);
...
@@ -2217,6 +2250,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
...
@@ -2217,6 +2250,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case
OP_Groupby
:
{
case
OP_Groupby
:
{
pRuntimeEnv
->
proot
=
pRuntimeEnv
->
proot
=
createGroupbyOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
createGroupbyOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
int32_t
opType
=
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
operatorType
;
int32_t
opType
=
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
operatorType
;
if
(
opType
!=
OP_DummyInput
)
{
if
(
opType
!=
OP_DummyInput
)
{
...
@@ -2227,6 +2263,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
...
@@ -2227,6 +2263,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case
OP_SessionWindow
:
{
case
OP_SessionWindow
:
{
pRuntimeEnv
->
proot
=
pRuntimeEnv
->
proot
=
createSWindowOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
createSWindowOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
int32_t
opType
=
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
operatorType
;
int32_t
opType
=
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
operatorType
;
if
(
opType
!=
OP_DummyInput
)
{
if
(
opType
!=
OP_DummyInput
)
{
setTableScanFilterOperatorInfo
(
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
info
,
pRuntimeEnv
->
proot
);
setTableScanFilterOperatorInfo
(
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
info
,
pRuntimeEnv
->
proot
);
...
@@ -2236,12 +2275,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
...
@@ -2236,12 +2275,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case
OP_MultiTableAggregate
:
{
case
OP_MultiTableAggregate
:
{
pRuntimeEnv
->
proot
=
pRuntimeEnv
->
proot
=
createMultiTableAggOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
createMultiTableAggOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
setTableScanFilterOperatorInfo
(
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
info
,
pRuntimeEnv
->
proot
);
setTableScanFilterOperatorInfo
(
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
info
,
pRuntimeEnv
->
proot
);
break
;
break
;
}
}
case
OP_Aggregate
:
{
case
OP_Aggregate
:
{
pRuntimeEnv
->
proot
=
pRuntimeEnv
->
proot
=
createAggregateOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
createAggregateOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
int32_t
opType
=
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
operatorType
;
int32_t
opType
=
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
operatorType
;
if
(
opType
!=
OP_DummyInput
&&
opType
!=
OP_Join
)
{
if
(
opType
!=
OP_DummyInput
&&
opType
!=
OP_Join
)
{
...
@@ -2262,11 +2307,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
...
@@ -2262,11 +2307,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
assert
(
pQueryAttr
->
pExpr2
!=
NULL
);
assert
(
pQueryAttr
->
pExpr2
!=
NULL
);
pRuntimeEnv
->
proot
=
createProjectOperatorInfo
(
pRuntimeEnv
,
prev
,
pQueryAttr
->
pExpr2
,
pQueryAttr
->
numOfExpr2
);
pRuntimeEnv
->
proot
=
createProjectOperatorInfo
(
pRuntimeEnv
,
prev
,
pQueryAttr
->
pExpr2
,
pQueryAttr
->
numOfExpr2
);
}
}
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
break
;
break
;
}
}
case
OP_StateWindow
:
{
case
OP_StateWindow
:
{
pRuntimeEnv
->
proot
=
createStatewindowOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
pRuntimeEnv
->
proot
=
createStatewindowOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
int32_t
opType
=
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
operatorType
;
int32_t
opType
=
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
operatorType
;
if
(
opType
!=
OP_DummyInput
)
{
if
(
opType
!=
OP_DummyInput
)
{
setTableScanFilterOperatorInfo
(
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
info
,
pRuntimeEnv
->
proot
);
setTableScanFilterOperatorInfo
(
pRuntimeEnv
->
proot
->
upstream
[
0
]
->
info
,
pRuntimeEnv
->
proot
);
...
@@ -2276,6 +2328,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
...
@@ -2276,6 +2328,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case
OP_Limit
:
{
case
OP_Limit
:
{
pRuntimeEnv
->
proot
=
createLimitOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
);
pRuntimeEnv
->
proot
=
createLimitOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
break
;
break
;
}
}
...
@@ -2287,12 +2342,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
...
@@ -2287,12 +2342,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv
->
proot
=
createFilterOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr3
,
pRuntimeEnv
->
proot
=
createFilterOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr3
,
pQueryAttr
->
numOfExpr3
,
pColInfo
,
numOfFilterCols
);
pQueryAttr
->
numOfExpr3
,
pColInfo
,
numOfFilterCols
);
freeColumnInfo
(
pColInfo
,
pQueryAttr
->
numOfExpr3
);
freeColumnInfo
(
pColInfo
,
pQueryAttr
->
numOfExpr3
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
}
else
{
}
else
{
SColumnInfo
*
pColInfo
=
SColumnInfo
*
pColInfo
=
extractColumnFilterInfo
(
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
,
&
numOfFilterCols
);
extractColumnFilterInfo
(
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
,
&
numOfFilterCols
);
pRuntimeEnv
->
proot
=
createFilterOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pRuntimeEnv
->
proot
=
createFilterOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
,
pColInfo
,
numOfFilterCols
);
pQueryAttr
->
numOfOutput
,
pColInfo
,
numOfFilterCols
);
freeColumnInfo
(
pColInfo
,
pQueryAttr
->
numOfOutput
);
freeColumnInfo
(
pColInfo
,
pQueryAttr
->
numOfOutput
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
}
}
break
;
break
;
...
@@ -2301,11 +2362,17 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
...
@@ -2301,11 +2362,17 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case
OP_Fill
:
{
case
OP_Fill
:
{
SOperatorInfo
*
pInfo
=
pRuntimeEnv
->
proot
;
SOperatorInfo
*
pInfo
=
pRuntimeEnv
->
proot
;
pRuntimeEnv
->
proot
=
createFillOperatorInfo
(
pRuntimeEnv
,
pInfo
,
pInfo
->
pExpr
,
pInfo
->
numOfOutput
,
pQueryAttr
->
multigroupResult
);
pRuntimeEnv
->
proot
=
createFillOperatorInfo
(
pRuntimeEnv
,
pInfo
,
pInfo
->
pExpr
,
pInfo
->
numOfOutput
,
pQueryAttr
->
multigroupResult
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
break
;
break
;
}
}
case
OP_MultiwayMergeSort
:
{
case
OP_MultiwayMergeSort
:
{
pRuntimeEnv
->
proot
=
createMultiwaySortOperatorInfo
(
pRuntimeEnv
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
,
4096
,
merger
);
pRuntimeEnv
->
proot
=
createMultiwaySortOperatorInfo
(
pRuntimeEnv
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
,
4096
,
merger
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
break
;
break
;
}
}
...
@@ -2317,6 +2384,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
...
@@ -2317,6 +2384,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv
->
proot
=
createGlobalAggregateOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr3
,
pRuntimeEnv
->
proot
=
createGlobalAggregateOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr3
,
pQueryAttr
->
numOfExpr3
,
merger
,
pQueryAttr
->
pUdfInfo
,
multigroupResult
);
pQueryAttr
->
numOfExpr3
,
merger
,
pQueryAttr
->
pUdfInfo
,
multigroupResult
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
break
;
break
;
}
}
...
@@ -2324,16 +2394,25 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
...
@@ -2324,16 +2394,25 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
int32_t
num
=
pRuntimeEnv
->
proot
->
numOfOutput
;
int32_t
num
=
pRuntimeEnv
->
proot
->
numOfOutput
;
SExprInfo
*
pExpr
=
pRuntimeEnv
->
proot
->
pExpr
;
SExprInfo
*
pExpr
=
pRuntimeEnv
->
proot
->
pExpr
;
pRuntimeEnv
->
proot
=
createSLimitOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pExpr
,
num
,
merger
,
pQueryAttr
->
multigroupResult
);
pRuntimeEnv
->
proot
=
createSLimitOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pExpr
,
num
,
merger
,
pQueryAttr
->
multigroupResult
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
break
;
break
;
}
}
case
OP_Distinct
:
{
case
OP_Distinct
:
{
pRuntimeEnv
->
proot
=
createDistinctOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
pRuntimeEnv
->
proot
=
createDistinctOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
break
;
break
;
}
}
case
OP_Order
:
{
case
OP_Order
:
{
pRuntimeEnv
->
proot
=
createOrderOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
,
&
pQueryAttr
->
order
);
pRuntimeEnv
->
proot
=
createOrderOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
,
&
pQueryAttr
->
order
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
goto
_clean
;
}
break
;
break
;
}
}
...
@@ -4832,7 +4911,6 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
...
@@ -4832,7 +4911,6 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
SQueryAttr
*
pQueryAttr
=
pQInfo
->
runtimeEnv
.
pQueryAttr
;
SQueryAttr
*
pQueryAttr
=
pQInfo
->
runtimeEnv
.
pQueryAttr
;
pQueryAttr
->
tsdb
=
tsdb
;
pQueryAttr
->
tsdb
=
tsdb
;
if
(
tsdb
!=
NULL
)
{
if
(
tsdb
!=
NULL
)
{
int32_t
code
=
setupQueryHandle
(
tsdb
,
pRuntimeEnv
,
pQInfo
->
qId
,
pQueryAttr
->
stableQuery
);
int32_t
code
=
setupQueryHandle
(
tsdb
,
pRuntimeEnv
,
pQInfo
->
qId
,
pQueryAttr
->
stableQuery
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
@@ -4853,18 +4931,30 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
...
@@ -4853,18 +4931,30 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
switch
(
tbScanner
)
{
switch
(
tbScanner
)
{
case
OP_TableBlockInfoScan
:
{
case
OP_TableBlockInfoScan
:
{
pRuntimeEnv
->
proot
=
createTableBlockInfoScanOperator
(
pRuntimeEnv
->
pQueryHandle
,
pRuntimeEnv
);
pRuntimeEnv
->
proot
=
createTableBlockInfoScanOperator
(
pRuntimeEnv
->
pQueryHandle
,
pRuntimeEnv
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
break
;
break
;
}
}
case
OP_TableSeqScan
:
{
case
OP_TableSeqScan
:
{
pRuntimeEnv
->
proot
=
createTableSeqScanOperator
(
pRuntimeEnv
->
pQueryHandle
,
pRuntimeEnv
);
pRuntimeEnv
->
proot
=
createTableSeqScanOperator
(
pRuntimeEnv
->
pQueryHandle
,
pRuntimeEnv
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
break
;
break
;
}
}
case
OP_DataBlocksOptScan
:
{
case
OP_DataBlocksOptScan
:
{
pRuntimeEnv
->
proot
=
createDataBlocksOptScanInfo
(
pRuntimeEnv
->
pQueryHandle
,
pRuntimeEnv
,
getNumOfScanTimes
(
pQueryAttr
),
pQueryAttr
->
needReverseScan
?
1
:
0
);
pRuntimeEnv
->
proot
=
createDataBlocksOptScanInfo
(
pRuntimeEnv
->
pQueryHandle
,
pRuntimeEnv
,
getNumOfScanTimes
(
pQueryAttr
),
pQueryAttr
->
needReverseScan
?
1
:
0
);
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
break
;
break
;
}
}
case
OP_TableScan
:
{
case
OP_TableScan
:
{
pRuntimeEnv
->
proot
=
createTableScanOperator
(
pRuntimeEnv
->
pQueryHandle
,
pRuntimeEnv
,
getNumOfScanTimes
(
pQueryAttr
));
pRuntimeEnv
->
proot
=
createTableScanOperator
(
pRuntimeEnv
->
pQueryHandle
,
pRuntimeEnv
,
getNumOfScanTimes
(
pQueryAttr
));
if
(
pRuntimeEnv
->
proot
==
NULL
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
break
;
break
;
}
}
default:
{
// do nothing
default:
{
// do nothing
...
@@ -4929,7 +5019,6 @@ static void doTableQueryInfoTimeWindowCheck(SQueryAttr* pQueryAttr, STableQueryI
...
@@ -4929,7 +5019,6 @@ static void doTableQueryInfoTimeWindowCheck(SQueryAttr* pQueryAttr, STableQueryI
}
}
}
}
STsdbQueryCond
createTsdbQueryCond
(
SQueryAttr
*
pQueryAttr
,
STimeWindow
*
win
)
{
STsdbQueryCond
createTsdbQueryCond
(
SQueryAttr
*
pQueryAttr
,
STimeWindow
*
win
)
{
STsdbQueryCond
cond
=
{
STsdbQueryCond
cond
=
{
.
colList
=
pQueryAttr
->
tableCols
,
.
colList
=
pQueryAttr
->
tableCols
,
...
@@ -5170,6 +5259,10 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
...
@@ -5170,6 +5259,10 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
assert
(
repeatTime
>
0
);
assert
(
repeatTime
>
0
);
STableScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableScanInfo
));
STableScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableScanInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
pQueryHandle
=
pTsdbQueryHandle
;
pInfo
->
pQueryHandle
=
pTsdbQueryHandle
;
pInfo
->
times
=
repeatTime
;
pInfo
->
times
=
repeatTime
;
pInfo
->
reverseTimes
=
0
;
pInfo
->
reverseTimes
=
0
;
...
@@ -5178,6 +5271,11 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
...
@@ -5178,6 +5271,11 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
// pInfo->prevGroupId = -1;
// pInfo->prevGroupId = -1;
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
tfree
(
pInfo
);
return
NULL
;
}
pOperator
->
name
=
"TableScanOperator"
;
pOperator
->
name
=
"TableScanOperator"
;
pOperator
->
operatorType
=
OP_TableScan
;
pOperator
->
operatorType
=
OP_TableScan
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
blockingOptr
=
false
;
...
@@ -5192,6 +5290,9 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
...
@@ -5192,6 +5290,9 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
SOperatorInfo
*
createTableSeqScanOperator
(
void
*
pTsdbQueryHandle
,
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SOperatorInfo
*
createTableSeqScanOperator
(
void
*
pTsdbQueryHandle
,
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
STableScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableScanInfo
));
STableScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableScanInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
pQueryHandle
=
pTsdbQueryHandle
;
pInfo
->
pQueryHandle
=
pTsdbQueryHandle
;
pInfo
->
times
=
1
;
pInfo
->
times
=
1
;
...
@@ -5202,6 +5303,11 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
...
@@ -5202,6 +5303,11 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
pRuntimeEnv
->
enableGroupData
=
true
;
pRuntimeEnv
->
enableGroupData
=
true
;
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
tfree
(
pInfo
);
return
NULL
;
}
pOperator
->
name
=
"TableSeqScanOperator"
;
pOperator
->
name
=
"TableSeqScanOperator"
;
pOperator
->
operatorType
=
OP_TableSeqScan
;
pOperator
->
operatorType
=
OP_TableSeqScan
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
blockingOptr
=
false
;
...
@@ -5216,9 +5322,15 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
...
@@ -5216,9 +5322,15 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
SOperatorInfo
*
createTableBlockInfoScanOperator
(
void
*
pTsdbQueryHandle
,
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SOperatorInfo
*
createTableBlockInfoScanOperator
(
void
*
pTsdbQueryHandle
,
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
STableScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableScanInfo
));
STableScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableScanInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
pQueryHandle
=
pTsdbQueryHandle
;
pInfo
->
pQueryHandle
=
pTsdbQueryHandle
;
pInfo
->
block
.
pDataBlock
=
taosArrayInit
(
1
,
sizeof
(
SColumnInfoData
));
pInfo
->
block
.
pDataBlock
=
taosArrayInit
(
1
,
sizeof
(
SColumnInfoData
));
if
(
pInfo
->
block
.
pDataBlock
==
NULL
)
{
goto
_clean
;
}
SColumnInfoData
infoData
=
{{
0
}};
SColumnInfoData
infoData
=
{{
0
}};
infoData
.
info
.
type
=
TSDB_DATA_TYPE_BINARY
;
infoData
.
info
.
type
=
TSDB_DATA_TYPE_BINARY
;
...
@@ -5227,6 +5339,11 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu
...
@@ -5227,6 +5339,11 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu
taosArrayPush
(
pInfo
->
block
.
pDataBlock
,
&
infoData
);
taosArrayPush
(
pInfo
->
block
.
pDataBlock
,
&
infoData
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
taosArrayDestroy
(
pInfo
->
block
.
pDataBlock
);
goto
_clean
;
}
pOperator
->
name
=
"TableBlockInfoScanOperator"
;
pOperator
->
name
=
"TableBlockInfoScanOperator"
;
pOperator
->
operatorType
=
OP_TableBlockInfoScan
;
pOperator
->
operatorType
=
OP_TableBlockInfoScan
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
blockingOptr
=
false
;
...
@@ -5237,6 +5354,11 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu
...
@@ -5237,6 +5354,11 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu
pOperator
->
exec
=
doBlockInfoScan
;
pOperator
->
exec
=
doBlockInfoScan
;
return
pOperator
;
return
pOperator
;
_clean:
tfree
(
pInfo
);
return
NULL
;
}
}
void
setTableScanFilterOperatorInfo
(
STableScanInfo
*
pTableScanInfo
,
SOperatorInfo
*
pDownstream
)
{
void
setTableScanFilterOperatorInfo
(
STableScanInfo
*
pTableScanInfo
,
SOperatorInfo
*
pDownstream
)
{
...
@@ -5299,6 +5421,10 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
...
@@ -5299,6 +5421,10 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
assert
(
repeatTime
>
0
);
assert
(
repeatTime
>
0
);
STableScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableScanInfo
));
STableScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableScanInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
pQueryHandle
=
pTsdbQueryHandle
;
pInfo
->
pQueryHandle
=
pTsdbQueryHandle
;
pInfo
->
times
=
repeatTime
;
pInfo
->
times
=
repeatTime
;
pInfo
->
reverseTimes
=
reverseTime
;
pInfo
->
reverseTimes
=
reverseTime
;
...
@@ -5306,6 +5432,11 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
...
@@ -5306,6 +5432,11 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
pInfo
->
order
=
pRuntimeEnv
->
pQueryAttr
->
order
.
order
;
pInfo
->
order
=
pRuntimeEnv
->
pQueryAttr
->
order
.
order
;
SOperatorInfo
*
pOptr
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOptr
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOptr
==
NULL
)
{
tfree
(
pInfo
);
return
NULL
;
}
pOptr
->
name
=
"DataBlocksOptimizedScanOperator"
;
pOptr
->
name
=
"DataBlocksOptimizedScanOperator"
;
pOptr
->
operatorType
=
OP_DataBlocksOptScan
;
pOptr
->
operatorType
=
OP_DataBlocksOptScan
;
pOptr
->
pRuntimeEnv
=
pRuntimeEnv
;
pOptr
->
pRuntimeEnv
=
pRuntimeEnv
;
...
@@ -5326,6 +5457,10 @@ SArray* getOrderCheckColumns(SQueryAttr* pQuery) {
...
@@ -5326,6 +5457,10 @@ SArray* getOrderCheckColumns(SQueryAttr* pQuery) {
pOrderColumns
=
taosArrayInit
(
4
,
sizeof
(
SColIndex
));
pOrderColumns
=
taosArrayInit
(
4
,
sizeof
(
SColIndex
));
}
}
if
(
pOrderColumns
==
NULL
)
{
return
NULL
;
}
if
(
pQuery
->
interval
.
interval
>
0
)
{
if
(
pQuery
->
interval
.
interval
>
0
)
{
if
(
pOrderColumns
==
NULL
)
{
if
(
pOrderColumns
==
NULL
)
{
pOrderColumns
=
taosArrayInit
(
1
,
sizeof
(
SColIndex
));
pOrderColumns
=
taosArrayInit
(
1
,
sizeof
(
SColIndex
));
...
@@ -5393,21 +5528,44 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
...
@@ -5393,21 +5528,44 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
SMultiwayMergeInfo
*
pInfo
=
(
SMultiwayMergeInfo
*
)
param
;
SMultiwayMergeInfo
*
pInfo
=
(
SMultiwayMergeInfo
*
)
param
;
destroyBasicOperatorInfo
(
&
pInfo
->
binfo
,
numOfOutput
);
destroyBasicOperatorInfo
(
&
pInfo
->
binfo
,
numOfOutput
);
if
(
pInfo
->
orderColumnList
)
{
taosArrayDestroy
(
pInfo
->
orderColumnList
);
taosArrayDestroy
(
pInfo
->
orderColumnList
);
}
if
(
pInfo
->
groupColumnList
)
{
taosArrayDestroy
(
pInfo
->
groupColumnList
);
taosArrayDestroy
(
pInfo
->
groupColumnList
);
}
if
(
pInfo
->
prevRow
)
{
tfree
(
pInfo
->
prevRow
);
tfree
(
pInfo
->
prevRow
);
}
if
(
pInfo
->
currentGroupColData
)
{
tfree
(
pInfo
->
currentGroupColData
);
tfree
(
pInfo
->
currentGroupColData
);
}
}
}
static
void
destroySlimitOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
static
void
destroySlimitOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SSLimitOperatorInfo
*
pInfo
=
(
SSLimitOperatorInfo
*
)
param
;
SSLimitOperatorInfo
*
pInfo
=
(
SSLimitOperatorInfo
*
)
param
;
if
(
pInfo
->
orderColumnList
)
{
taosArrayDestroy
(
pInfo
->
orderColumnList
);
taosArrayDestroy
(
pInfo
->
orderColumnList
);
}
if
(
pInfo
->
pRes
)
{
pInfo
->
pRes
=
destroyOutputBuf
(
pInfo
->
pRes
);
pInfo
->
pRes
=
destroyOutputBuf
(
pInfo
->
pRes
);
}
if
(
pInfo
->
prevRow
)
{
tfree
(
pInfo
->
prevRow
);
tfree
(
pInfo
->
prevRow
);
}
}
}
SOperatorInfo
*
createGlobalAggregateOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SOperatorInfo
*
createGlobalAggregateOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
void
*
param
,
SArray
*
pUdfInfo
,
bool
groupResultMixedUp
)
{
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
void
*
param
,
SArray
*
pUdfInfo
,
bool
groupResultMixedUp
)
{
SMultiwayMergeInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SMultiwayMergeInfo
));
SMultiwayMergeInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SMultiwayMergeInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
resultRowFactor
=
pInfo
->
resultRowFactor
=
(
int32_t
)(
getRowNumForMultioutput
(
pRuntimeEnv
->
pQueryAttr
,
pRuntimeEnv
->
pQueryAttr
->
topBotQuery
,
false
));
(
int32_t
)(
getRowNumForMultioutput
(
pRuntimeEnv
->
pQueryAttr
,
pRuntimeEnv
->
pQueryAttr
->
topBotQuery
,
false
));
...
@@ -5423,6 +5581,10 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
...
@@ -5423,6 +5581,10 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
pInfo
->
orderColumnList
=
getOrderCheckColumns
(
pRuntimeEnv
->
pQueryAttr
);
pInfo
->
orderColumnList
=
getOrderCheckColumns
(
pRuntimeEnv
->
pQueryAttr
);
pInfo
->
groupColumnList
=
getResultGroupCheckColumns
(
pRuntimeEnv
->
pQueryAttr
);
pInfo
->
groupColumnList
=
getResultGroupCheckColumns
(
pRuntimeEnv
->
pQueryAttr
);
if
(
pInfo
->
binfo
.
pRes
==
NULL
||
pInfo
->
binfo
.
pCtx
==
NULL
||
pInfo
->
orderColumnList
==
NULL
||
pInfo
->
groupColumnList
==
NULL
)
{
goto
_clean
;
}
// TODO refactor
// TODO refactor
int32_t
len
=
0
;
int32_t
len
=
0
;
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
...
@@ -5442,6 +5604,10 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
...
@@ -5442,6 +5604,10 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
numOfCols
=
(
pInfo
->
groupColumnList
!=
NULL
)
?
(
int32_t
)
taosArrayGetSize
(
pInfo
->
groupColumnList
)
:
0
;
numOfCols
=
(
pInfo
->
groupColumnList
!=
NULL
)
?
(
int32_t
)
taosArrayGetSize
(
pInfo
->
groupColumnList
)
:
0
;
pInfo
->
currentGroupColData
=
calloc
(
1
,
(
POINTER_BYTES
*
numOfCols
+
len
));
pInfo
->
currentGroupColData
=
calloc
(
1
,
(
POINTER_BYTES
*
numOfCols
+
len
));
if
(
pInfo
->
currentGroupColData
==
NULL
)
{
goto
_clean
;
}
offset
=
POINTER_BYTES
*
numOfCols
;
offset
=
POINTER_BYTES
*
numOfCols
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
...
@@ -5452,11 +5618,18 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
...
@@ -5452,11 +5618,18 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
}
}
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
if
(
pInfo
->
binfo
.
resultRowInfo
.
pResult
==
NULL
)
{
goto
_clean
;
}
pInfo
->
seed
=
rand
();
pInfo
->
seed
=
rand
();
setDefaultOutputBuf
(
pRuntimeEnv
,
&
pInfo
->
binfo
,
pInfo
->
seed
,
MERGE_STAGE
);
setDefaultOutputBuf
(
pRuntimeEnv
,
&
pInfo
->
binfo
,
pInfo
->
seed
,
MERGE_STAGE
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"GlobalAggregate"
;
pOperator
->
name
=
"GlobalAggregate"
;
pOperator
->
operatorType
=
OP_GlobalAggregate
;
pOperator
->
operatorType
=
OP_GlobalAggregate
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
blockingOptr
=
true
;
...
@@ -5471,17 +5644,30 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
...
@@ -5471,17 +5644,30 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
appendUpstream
(
pOperator
,
upstream
);
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
return
pOperator
;
_clean:
destroyGlobalAggOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
SOperatorInfo
*
createMultiwaySortOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SOperatorInfo
*
createMultiwaySortOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
int32_t
numOfRows
,
void
*
merger
)
{
int32_t
numOfRows
,
void
*
merger
)
{
SMultiwayMergeInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SMultiwayMergeInfo
));
SMultiwayMergeInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SMultiwayMergeInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
pMerge
=
merger
;
pInfo
->
pMerge
=
merger
;
pInfo
->
bufCapacity
=
numOfRows
;
pInfo
->
bufCapacity
=
numOfRows
;
pInfo
->
orderColumnList
=
getResultGroupCheckColumns
(
pRuntimeEnv
->
pQueryAttr
);
pInfo
->
orderColumnList
=
getResultGroupCheckColumns
(
pRuntimeEnv
->
pQueryAttr
);
pInfo
->
binfo
.
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
numOfRows
);
pInfo
->
binfo
.
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
numOfRows
);
if
(
pInfo
->
orderColumnList
==
NULL
||
pInfo
->
binfo
.
pRes
==
NULL
)
{
goto
_clean
;
}
{
// todo extract method to create prev compare buffer
{
// todo extract method to create prev compare buffer
int32_t
len
=
0
;
int32_t
len
=
0
;
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
...
@@ -5490,6 +5676,9 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
...
@@ -5490,6 +5676,9 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
int32_t
numOfCols
=
(
pInfo
->
orderColumnList
!=
NULL
)
?
(
int32_t
)
taosArrayGetSize
(
pInfo
->
orderColumnList
)
:
0
;
int32_t
numOfCols
=
(
pInfo
->
orderColumnList
!=
NULL
)
?
(
int32_t
)
taosArrayGetSize
(
pInfo
->
orderColumnList
)
:
0
;
pInfo
->
prevRow
=
calloc
(
1
,
(
POINTER_BYTES
*
numOfCols
+
len
));
pInfo
->
prevRow
=
calloc
(
1
,
(
POINTER_BYTES
*
numOfCols
+
len
));
if
(
pInfo
->
prevRow
==
NULL
)
{
goto
_clean
;
}
int32_t
offset
=
POINTER_BYTES
*
numOfCols
;
int32_t
offset
=
POINTER_BYTES
*
numOfCols
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
...
@@ -5501,6 +5690,10 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
...
@@ -5501,6 +5690,10 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
}
}
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"MultiwaySortOperator"
;
pOperator
->
name
=
"MultiwaySortOperator"
;
pOperator
->
operatorType
=
OP_MultiwayMergeSort
;
pOperator
->
operatorType
=
OP_MultiwayMergeSort
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
blockingOptr
=
false
;
...
@@ -5512,6 +5705,12 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
...
@@ -5512,6 +5705,12 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
pOperator
->
exec
=
doMultiwayMergeSort
;
pOperator
->
exec
=
doMultiwayMergeSort
;
pOperator
->
cleanup
=
destroyGlobalAggOperatorInfo
;
pOperator
->
cleanup
=
destroyGlobalAggOperatorInfo
;
return
pOperator
;
return
pOperator
;
_clean:
destroyGlobalAggOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
static
int32_t
doMergeSDatablock
(
SSDataBlock
*
pDest
,
SSDataBlock
*
pSrc
)
{
static
int32_t
doMergeSDatablock
(
SSDataBlock
*
pDest
,
SSDataBlock
*
pSrc
)
{
...
@@ -5588,11 +5787,22 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
...
@@ -5588,11 +5787,22 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
SOperatorInfo
*
createOrderOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SOrderVal
*
pOrderVal
)
{
SOperatorInfo
*
createOrderOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SOrderVal
*
pOrderVal
)
{
SOrderOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SOrderOperatorInfo
));
SOrderOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SOrderOperatorInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
{
{
SSDataBlock
*
pDataBlock
=
calloc
(
1
,
sizeof
(
SSDataBlock
));
SSDataBlock
*
pDataBlock
=
calloc
(
1
,
sizeof
(
SSDataBlock
));
if
(
pDataBlock
==
NULL
)
{
goto
_clean
;
}
pDataBlock
->
pDataBlock
=
taosArrayInit
(
numOfOutput
,
sizeof
(
SColumnInfoData
));
pDataBlock
->
pDataBlock
=
taosArrayInit
(
numOfOutput
,
sizeof
(
SColumnInfoData
));
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
if
(
pDataBlock
->
pDataBlock
==
NULL
)
{
goto
_clean
;
}
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
SColumnInfoData
col
=
{{
0
}};
SColumnInfoData
col
=
{{
0
}};
col
.
info
.
colId
=
pExpr
[
i
].
base
.
colInfo
.
colId
;
col
.
info
.
colId
=
pExpr
[
i
].
base
.
colInfo
.
colId
;
col
.
info
.
bytes
=
pExpr
[
i
].
base
.
colBytes
;
col
.
info
.
bytes
=
pExpr
[
i
].
base
.
colBytes
;
...
@@ -5610,6 +5820,10 @@ SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
...
@@ -5610,6 +5820,10 @@ SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
}
}
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"InMemoryOrder"
;
pOperator
->
name
=
"InMemoryOrder"
;
pOperator
->
operatorType
=
OP_Order
;
pOperator
->
operatorType
=
OP_Order
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
blockingOptr
=
true
;
...
@@ -5621,6 +5835,12 @@ SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
...
@@ -5621,6 +5835,12 @@ SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
appendUpstream
(
pOperator
,
upstream
);
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
return
pOperator
;
_clean:
destroyOrderOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
static
int32_t
getTableScanOrder
(
STableScanInfo
*
pTableScanInfo
)
{
static
int32_t
getTableScanOrder
(
STableScanInfo
*
pTableScanInfo
)
{
...
@@ -5892,8 +6112,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
...
@@ -5892,8 +6112,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
if
(
pRuntimeEnv
->
currentOffset
==
0
)
{
if
(
pRuntimeEnv
->
currentOffset
==
0
)
{
break
;
break
;
}
}
else
if
(
srows
>
0
)
{
else
if
(
srows
>
0
)
{
if
(
pRuntimeEnv
->
currentOffset
-
srows
>=
pBlock
->
info
.
rows
)
{
if
(
pRuntimeEnv
->
currentOffset
-
srows
>=
pBlock
->
info
.
rows
)
{
pRuntimeEnv
->
currentOffset
-=
pBlock
->
info
.
rows
;
pRuntimeEnv
->
currentOffset
-=
pBlock
->
info
.
rows
;
}
else
{
}
else
{
...
@@ -6606,6 +6825,9 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
...
@@ -6606,6 +6825,9 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
SOperatorInfo
*
createAggregateOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SOperatorInfo
*
createAggregateOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SAggOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SAggOperatorInfo
));
SAggOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SAggOperatorInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
SQueryAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
SQueryAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
int32_t
numOfRows
=
(
int32_t
)(
getRowNumForMultioutput
(
pQueryAttr
,
pQueryAttr
->
topBotQuery
,
pQueryAttr
->
stableQuery
));
int32_t
numOfRows
=
(
int32_t
)(
getRowNumForMultioutput
(
pQueryAttr
,
pQueryAttr
->
topBotQuery
,
pQueryAttr
->
stableQuery
));
...
@@ -6615,10 +6837,18 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
...
@@ -6615,10 +6837,18 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
if
(
pInfo
->
binfo
.
pRes
==
NULL
||
pInfo
->
binfo
.
pCtx
==
NULL
||
pInfo
->
binfo
.
resultRowInfo
.
pResult
==
NULL
)
{
goto
_clean
;
}
pInfo
->
seed
=
rand
();
pInfo
->
seed
=
rand
();
setDefaultOutputBuf
(
pRuntimeEnv
,
&
pInfo
->
binfo
,
pInfo
->
seed
,
MASTER_SCAN
);
setDefaultOutputBuf
(
pRuntimeEnv
,
&
pInfo
->
binfo
,
pInfo
->
seed
,
MASTER_SCAN
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"TableAggregate"
;
pOperator
->
name
=
"TableAggregate"
;
pOperator
->
operatorType
=
OP_Aggregate
;
pOperator
->
operatorType
=
OP_Aggregate
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
blockingOptr
=
true
;
...
@@ -6633,31 +6863,53 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
...
@@ -6633,31 +6863,53 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
appendUpstream
(
pOperator
,
upstream
);
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
return
pOperator
;
_clean:
destroyAggOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
static
void
doDestroyBasicInfo
(
SOptrBasicInfo
*
pInfo
,
int32_t
numOfOutput
)
{
static
void
doDestroyBasicInfo
(
SOptrBasicInfo
*
pInfo
,
int32_t
numOfOutput
)
{
assert
(
pInfo
!=
NULL
);
assert
(
pInfo
!=
NULL
);
if
(
pInfo
->
pCtx
)
{
destroySQLFunctionCtx
(
pInfo
->
pCtx
,
numOfOutput
);
destroySQLFunctionCtx
(
pInfo
->
pCtx
,
numOfOutput
);
}
if
(
pInfo
->
rowCellInfoOffset
)
{
tfree
(
pInfo
->
rowCellInfoOffset
);
tfree
(
pInfo
->
rowCellInfoOffset
);
}
if
(
pInfo
->
resultRowInfo
.
pResult
)
{
cleanupResultRowInfo
(
&
pInfo
->
resultRowInfo
);
cleanupResultRowInfo
(
&
pInfo
->
resultRowInfo
);
}
if
(
pInfo
->
pRes
)
{
pInfo
->
pRes
=
destroyOutputBuf
(
pInfo
->
pRes
);
pInfo
->
pRes
=
destroyOutputBuf
(
pInfo
->
pRes
);
}
}
}
static
void
destroyBasicOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
static
void
destroyBasicOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SOptrBasicInfo
*
pInfo
=
(
SOptrBasicInfo
*
)
param
;
SOptrBasicInfo
*
pInfo
=
(
SOptrBasicInfo
*
)
param
;
doDestroyBasicInfo
(
pInfo
,
numOfOutput
);
doDestroyBasicInfo
(
pInfo
,
numOfOutput
);
}
}
static
void
destroyStateWindowOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
static
void
destroyStateWindowOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SStateWindowOperatorInfo
*
pInfo
=
(
SStateWindowOperatorInfo
*
)
param
;
SStateWindowOperatorInfo
*
pInfo
=
(
SStateWindowOperatorInfo
*
)
param
;
doDestroyBasicInfo
(
&
pInfo
->
binfo
,
numOfOutput
);
doDestroyBasicInfo
(
&
pInfo
->
binfo
,
numOfOutput
);
if
(
pInfo
->
prevData
)
{
tfree
(
pInfo
->
prevData
);
tfree
(
pInfo
->
prevData
);
}
}
}
static
void
destroyAggOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
static
void
destroyAggOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SAggOperatorInfo
*
pInfo
=
(
SAggOperatorInfo
*
)
param
;
SAggOperatorInfo
*
pInfo
=
(
SAggOperatorInfo
*
)
param
;
doDestroyBasicInfo
(
&
pInfo
->
binfo
,
numOfOutput
);
doDestroyBasicInfo
(
&
pInfo
->
binfo
,
numOfOutput
);
}
}
static
void
destroySWindowOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
static
void
destroySWindowOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SSWindowOperatorInfo
*
pInfo
=
(
SSWindowOperatorInfo
*
)
param
;
SSWindowOperatorInfo
*
pInfo
=
(
SSWindowOperatorInfo
*
)
param
;
doDestroyBasicInfo
(
&
pInfo
->
binfo
,
numOfOutput
);
doDestroyBasicInfo
(
&
pInfo
->
binfo
,
numOfOutput
);
...
@@ -6665,15 +6917,26 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
...
@@ -6665,15 +6917,26 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
static
void
destroySFillOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
static
void
destroySFillOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SFillOperatorInfo
*
pInfo
=
(
SFillOperatorInfo
*
)
param
;
SFillOperatorInfo
*
pInfo
=
(
SFillOperatorInfo
*
)
param
;
if
(
pInfo
->
pFillInfo
)
{
pInfo
->
pFillInfo
=
taosDestroyFillInfo
(
pInfo
->
pFillInfo
);
pInfo
->
pFillInfo
=
taosDestroyFillInfo
(
pInfo
->
pFillInfo
);
}
if
(
pInfo
->
pRes
)
{
pInfo
->
pRes
=
destroyOutputBuf
(
pInfo
->
pRes
);
pInfo
->
pRes
=
destroyOutputBuf
(
pInfo
->
pRes
);
}
if
(
pInfo
->
p
)
{
tfree
(
pInfo
->
p
);
tfree
(
pInfo
->
p
);
}
}
}
static
void
destroyGroupbyOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
static
void
destroyGroupbyOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SGroupbyOperatorInfo
*
pInfo
=
(
SGroupbyOperatorInfo
*
)
param
;
SGroupbyOperatorInfo
*
pInfo
=
(
SGroupbyOperatorInfo
*
)
param
;
doDestroyBasicInfo
(
&
pInfo
->
binfo
,
numOfOutput
);
doDestroyBasicInfo
(
&
pInfo
->
binfo
,
numOfOutput
);
if
(
pInfo
->
prevData
)
{
tfree
(
pInfo
->
prevData
);
tfree
(
pInfo
->
prevData
);
}
}
}
static
void
destroyProjectOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
static
void
destroyProjectOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
...
@@ -6683,12 +6946,16 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
...
@@ -6683,12 +6946,16 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
static
void
destroyTagScanOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
static
void
destroyTagScanOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
STagScanInfo
*
pInfo
=
(
STagScanInfo
*
)
param
;
STagScanInfo
*
pInfo
=
(
STagScanInfo
*
)
param
;
if
(
pInfo
->
pRes
)
{
pInfo
->
pRes
=
destroyOutputBuf
(
pInfo
->
pRes
);
pInfo
->
pRes
=
destroyOutputBuf
(
pInfo
->
pRes
);
}
}
}
static
void
destroyOrderOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
static
void
destroyOrderOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SOrderOperatorInfo
*
pInfo
=
(
SOrderOperatorInfo
*
)
param
;
SOrderOperatorInfo
*
pInfo
=
(
SOrderOperatorInfo
*
)
param
;
if
(
pInfo
->
pDataBlock
)
{
pInfo
->
pDataBlock
=
destroyOutputBuf
(
pInfo
->
pDataBlock
);
pInfo
->
pDataBlock
=
destroyOutputBuf
(
pInfo
->
pDataBlock
);
}
}
}
static
void
destroyConditionOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
static
void
destroyConditionOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
...
@@ -6698,22 +6965,45 @@ static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
...
@@ -6698,22 +6965,45 @@ static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
static
void
destroyDistinctOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
static
void
destroyDistinctOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SDistinctOperatorInfo
*
pInfo
=
(
SDistinctOperatorInfo
*
)
param
;
SDistinctOperatorInfo
*
pInfo
=
(
SDistinctOperatorInfo
*
)
param
;
if
(
pInfo
->
pSet
)
{
taosHashCleanup
(
pInfo
->
pSet
);
taosHashCleanup
(
pInfo
->
pSet
);
}
if
(
pInfo
->
buf
)
{
tfree
(
pInfo
->
buf
);
tfree
(
pInfo
->
buf
);
}
if
(
pInfo
->
pDistinctDataInfo
)
{
taosArrayDestroy
(
pInfo
->
pDistinctDataInfo
);
taosArrayDestroy
(
pInfo
->
pDistinctDataInfo
);
}
if
(
pInfo
->
pRes
)
{
pInfo
->
pRes
=
destroyOutputBuf
(
pInfo
->
pRes
);
pInfo
->
pRes
=
destroyOutputBuf
(
pInfo
->
pRes
);
}
}
}
SOperatorInfo
*
createMultiTableAggOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SOperatorInfo
*
createMultiTableAggOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SAggOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SAggOperatorInfo
));
SAggOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SAggOperatorInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
size_t
tableGroup
=
GET_NUM_OF_TABLEGROUP
(
pRuntimeEnv
);
size_t
tableGroup
=
GET_NUM_OF_TABLEGROUP
(
pRuntimeEnv
);
pInfo
->
binfo
.
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
(
int32_t
)
tableGroup
);
pInfo
->
binfo
.
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
(
int32_t
)
tableGroup
);
pInfo
->
binfo
.
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
binfo
.
rowCellInfoOffset
);
pInfo
->
binfo
.
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
binfo
.
rowCellInfoOffset
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
(
int32_t
)
tableGroup
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
(
int32_t
)
tableGroup
,
TSDB_DATA_TYPE_INT
);
if
(
pInfo
->
binfo
.
pRes
==
NULL
||
pInfo
->
binfo
.
pCtx
==
NULL
||
pInfo
->
binfo
.
resultRowInfo
.
pResult
==
NULL
)
{
goto
_clean
;
}
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"MultiTableAggregate"
;
pOperator
->
name
=
"MultiTableAggregate"
;
pOperator
->
operatorType
=
OP_MultiTableAggregate
;
pOperator
->
operatorType
=
OP_MultiTableAggregate
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
blockingOptr
=
true
;
...
@@ -6728,10 +7018,19 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
...
@@ -6728,10 +7018,19 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
appendUpstream
(
pOperator
,
upstream
);
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
return
pOperator
;
_clean:
destroyAggOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
SOperatorInfo
*
createProjectOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SOperatorInfo
*
createProjectOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SProjectOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SProjectOperatorInfo
));
SProjectOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SProjectOperatorInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
seed
=
rand
();
pInfo
->
seed
=
rand
();
pInfo
->
bufCapacity
=
pRuntimeEnv
->
resultInfo
.
capacity
;
pInfo
->
bufCapacity
=
pRuntimeEnv
->
resultInfo
.
capacity
;
...
@@ -6741,9 +7040,18 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
...
@@ -6741,9 +7040,18 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
pBInfo
->
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pBInfo
->
rowCellInfoOffset
);
pBInfo
->
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pBInfo
->
rowCellInfoOffset
);
initResultRowInfo
(
&
pBInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pBInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
if
(
pInfo
->
binfo
.
pRes
==
NULL
||
pInfo
->
binfo
.
pCtx
==
NULL
||
pBInfo
->
resultRowInfo
.
pResult
==
NULL
)
{
goto
_clean
;
}
setDefaultOutputBuf
(
pRuntimeEnv
,
pBInfo
,
pInfo
->
seed
,
MASTER_SCAN
);
setDefaultOutputBuf
(
pRuntimeEnv
,
pBInfo
,
pInfo
->
seed
,
MASTER_SCAN
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"ProjectOperator"
;
pOperator
->
name
=
"ProjectOperator"
;
pOperator
->
operatorType
=
OP_Project
;
pOperator
->
operatorType
=
OP_Project
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
blockingOptr
=
false
;
...
@@ -6758,6 +7066,12 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
...
@@ -6758,6 +7066,12 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
appendUpstream
(
pOperator
,
upstream
);
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
return
pOperator
;
_clean:
destroyProjectOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
SColumnInfo
*
extractColumnFilterInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
int32_t
*
numOfFilterCols
)
{
SColumnInfo
*
extractColumnFilterInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
int32_t
*
numOfFilterCols
)
{
...
@@ -6792,12 +7106,18 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3
...
@@ -6792,12 +7106,18 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3
SOperatorInfo
*
createFilterOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
SOperatorInfo
*
createFilterOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SColumnInfo
*
pCols
,
int32_t
numOfFilter
)
{
int32_t
numOfOutput
,
SColumnInfo
*
pCols
,
int32_t
numOfFilter
)
{
SFilterOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SFilterOperatorInfo
));
SFilterOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SFilterOperatorInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
assert
(
numOfFilter
>
0
&&
pCols
!=
NULL
);
assert
(
numOfFilter
>
0
&&
pCols
!=
NULL
);
doCreateFilterInfo
(
pCols
,
numOfOutput
,
numOfFilter
,
&
pInfo
->
pFilterInfo
,
0
);
doCreateFilterInfo
(
pCols
,
numOfOutput
,
numOfFilter
,
&
pInfo
->
pFilterInfo
,
0
);
pInfo
->
numOfFilterCols
=
numOfFilter
;
pInfo
->
numOfFilterCols
=
numOfFilter
;
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"FilterOperator"
;
pOperator
->
name
=
"FilterOperator"
;
pOperator
->
operatorType
=
OP_Filter
;
pOperator
->
operatorType
=
OP_Filter
;
...
@@ -6812,13 +7132,27 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
...
@@ -6812,13 +7132,27 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
appendUpstream
(
pOperator
,
upstream
);
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
return
pOperator
;
_clean:
destroyConditionOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
SOperatorInfo
*
createLimitOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
)
{
SOperatorInfo
*
createLimitOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
)
{
SLimitOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SLimitOperatorInfo
));
SLimitOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SLimitOperatorInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
limit
=
pRuntimeEnv
->
pQueryAttr
->
limit
.
limit
;
pInfo
->
limit
=
pRuntimeEnv
->
pQueryAttr
->
limit
.
limit
;
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
tfree
(
pInfo
);
return
NULL
;
}
pOperator
->
name
=
"LimitOperator"
;
pOperator
->
name
=
"LimitOperator"
;
pOperator
->
operatorType
=
OP_Limit
;
pOperator
->
operatorType
=
OP_Limit
;
...
@@ -6834,12 +7168,23 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
...
@@ -6834,12 +7168,23 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo
*
createTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SOperatorInfo
*
createTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
STableIntervalOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableIntervalOperatorInfo
));
STableIntervalOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableIntervalOperatorInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
rowCellInfoOffset
);
pInfo
->
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
rowCellInfoOffset
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
if
(
pInfo
->
pRes
==
NULL
||
pInfo
->
pCtx
==
NULL
||
pInfo
->
resultRowInfo
.
pResult
==
NULL
)
{
goto
_clean
;
}
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"TimeIntervalAggOperator"
;
pOperator
->
name
=
"TimeIntervalAggOperator"
;
pOperator
->
operatorType
=
OP_TimeWindow
;
pOperator
->
operatorType
=
OP_TimeWindow
;
...
@@ -6854,17 +7199,34 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
...
@@ -6854,17 +7199,34 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
appendUpstream
(
pOperator
,
upstream
);
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
return
pOperator
;
_clean:
destroyBasicOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
SOperatorInfo
*
createAllTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SOperatorInfo
*
createAllTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
STableIntervalOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableIntervalOperatorInfo
));
STableIntervalOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableIntervalOperatorInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
rowCellInfoOffset
);
pInfo
->
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
rowCellInfoOffset
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
if
(
pInfo
->
pRes
==
NULL
||
pInfo
->
pCtx
==
NULL
||
pInfo
->
resultRowInfo
.
pResult
==
NULL
)
{
goto
_clean
;
}
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"AllTimeIntervalAggOperator"
;
pOperator
->
name
=
"AllTimeIntervalAggOperator"
;
pOperator
->
operatorType
=
OP_AllTimeWindow
;
pOperator
->
operatorType
=
OP_AllTimeWindow
;
...
@@ -6879,17 +7241,36 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
...
@@ -6879,17 +7241,36 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
appendUpstream
(
pOperator
,
upstream
);
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
return
pOperator
;
_clean:
destroyBasicOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
SOperatorInfo
*
createStatewindowOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SOperatorInfo
*
createStatewindowOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SStateWindowOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SStateWindowOperatorInfo
));
SStateWindowOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SStateWindowOperatorInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
colIndex
=
-
1
;
pInfo
->
colIndex
=
-
1
;
pInfo
->
reptScan
=
false
;
pInfo
->
reptScan
=
false
;
pInfo
->
binfo
.
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
binfo
.
rowCellInfoOffset
);
pInfo
->
binfo
.
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
binfo
.
rowCellInfoOffset
);
pInfo
->
binfo
.
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
pInfo
->
binfo
.
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
if
(
pInfo
->
binfo
.
pRes
==
NULL
||
pInfo
->
binfo
.
pCtx
==
NULL
||
pInfo
->
binfo
.
resultRowInfo
.
pResult
==
NULL
)
{
goto
_clean
;
}
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"StateWindowOperator"
;
pOperator
->
name
=
"StateWindowOperator"
;
pOperator
->
operatorType
=
OP_StateWindow
;
pOperator
->
operatorType
=
OP_StateWindow
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
blockingOptr
=
true
;
...
@@ -6903,17 +7284,35 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
...
@@ -6903,17 +7284,35 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
appendUpstream
(
pOperator
,
upstream
);
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
return
pOperator
;
_clean:
destroyStateWindowOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
SOperatorInfo
*
createSWindowOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SOperatorInfo
*
createSWindowOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SSWindowOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SSWindowOperatorInfo
));
SSWindowOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SSWindowOperatorInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
binfo
.
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
binfo
.
rowCellInfoOffset
);
pInfo
->
binfo
.
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
binfo
.
rowCellInfoOffset
);
pInfo
->
binfo
.
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
pInfo
->
binfo
.
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
if
(
pInfo
->
binfo
.
pRes
==
NULL
||
pInfo
->
binfo
.
pCtx
==
NULL
||
pInfo
->
binfo
.
resultRowInfo
.
pResult
==
NULL
)
{
goto
_clean
;
}
pInfo
->
prevTs
=
INT64_MIN
;
pInfo
->
prevTs
=
INT64_MIN
;
pInfo
->
reptScan
=
false
;
pInfo
->
reptScan
=
false
;
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"SessionWindowAggOperator"
;
pOperator
->
name
=
"SessionWindowAggOperator"
;
pOperator
->
operatorType
=
OP_SessionWindow
;
pOperator
->
operatorType
=
OP_SessionWindow
;
...
@@ -6928,16 +7327,34 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
...
@@ -6928,16 +7327,34 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
appendUpstream
(
pOperator
,
upstream
);
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
return
pOperator
;
_clean:
destroySWindowOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
SOperatorInfo
*
createMultiTableTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SOperatorInfo
*
createMultiTableTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
STableIntervalOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableIntervalOperatorInfo
));
STableIntervalOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableIntervalOperatorInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
rowCellInfoOffset
);
pInfo
->
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
rowCellInfoOffset
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
if
(
pInfo
->
pRes
==
NULL
||
pInfo
->
pCtx
==
NULL
||
pInfo
->
resultRowInfo
.
pResult
==
NULL
)
{
goto
_clean
;
}
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"MultiTableTimeIntervalOperator"
;
pOperator
->
name
=
"MultiTableTimeIntervalOperator"
;
pOperator
->
operatorType
=
OP_MultiTableTimeInterval
;
pOperator
->
operatorType
=
OP_MultiTableTimeInterval
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
blockingOptr
=
true
;
...
@@ -6952,16 +7369,34 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti
...
@@ -6952,16 +7369,34 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti
appendUpstream
(
pOperator
,
upstream
);
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
return
pOperator
;
_clean:
destroyBasicOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
SOperatorInfo
*
createAllMultiTableTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SOperatorInfo
*
createAllMultiTableTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
STableIntervalOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableIntervalOperatorInfo
));
STableIntervalOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableIntervalOperatorInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
rowCellInfoOffset
);
pInfo
->
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
rowCellInfoOffset
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
if
(
pInfo
->
pRes
==
NULL
||
pInfo
->
pCtx
==
NULL
||
pInfo
->
resultRowInfo
.
pResult
==
NULL
)
{
goto
_clean
;
}
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"AllMultiTableTimeIntervalOperator"
;
pOperator
->
name
=
"AllMultiTableTimeIntervalOperator"
;
pOperator
->
operatorType
=
OP_AllMultiTableTimeInterval
;
pOperator
->
operatorType
=
OP_AllMultiTableTimeInterval
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
blockingOptr
=
true
;
...
@@ -6977,14 +7412,22 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRu
...
@@ -6977,14 +7412,22 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRu
appendUpstream
(
pOperator
,
upstream
);
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
return
pOperator
;
_clean:
destroyBasicOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
SOperatorInfo
*
createGroupbyOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SOperatorInfo
*
createGroupbyOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SGroupbyOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SGroupbyOperatorInfo
));
SGroupbyOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SGroupbyOperatorInfo
));
pInfo
->
colIndex
=
-
1
;
// group by column index
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
colIndex
=
-
1
;
// group by column index
pInfo
->
binfo
.
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
binfo
.
rowCellInfoOffset
);
pInfo
->
binfo
.
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
binfo
.
rowCellInfoOffset
);
SQueryAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
SQueryAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
...
@@ -6993,9 +7436,18 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
...
@@ -6993,9 +7436,18 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
(
int32_t
)(
getRowNumForMultioutput
(
pQueryAttr
,
pQueryAttr
->
topBotQuery
,
pQueryAttr
->
stableQuery
)));
(
int32_t
)(
getRowNumForMultioutput
(
pQueryAttr
,
pQueryAttr
->
topBotQuery
,
pQueryAttr
->
stableQuery
)));
pInfo
->
binfo
.
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
pInfo
->
binfo
.
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
if
(
pInfo
->
binfo
.
pCtx
==
NULL
||
pInfo
->
binfo
.
pRes
==
NULL
||
pInfo
->
binfo
.
resultRowInfo
.
pResult
==
NULL
)
{
goto
_clean
;
}
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"GroupbyAggOperator"
;
pOperator
->
name
=
"GroupbyAggOperator"
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
status
=
OP_IN_EXECUTING
;
...
@@ -7009,16 +7461,34 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
...
@@ -7009,16 +7461,34 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
appendUpstream
(
pOperator
,
upstream
);
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
return
pOperator
;
_clean:
destroyGroupbyOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
SOperatorInfo
*
createFillOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
bool
multigroupResult
)
{
SOperatorInfo
*
createFillOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
bool
multigroupResult
)
{
SFillOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SFillOperatorInfo
));
SFillOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SFillOperatorInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
if
(
pInfo
->
pRes
==
NULL
)
{
goto
_clean
;
}
pInfo
->
multigroupResult
=
multigroupResult
;
pInfo
->
multigroupResult
=
multigroupResult
;
{
{
SQueryAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
SQueryAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
SFillColInfo
*
pColInfo
=
createFillColInfo
(
pExpr
,
numOfOutput
,
pQueryAttr
->
fillVal
);
SFillColInfo
*
pColInfo
=
createFillColInfo
(
pExpr
,
numOfOutput
,
pQueryAttr
->
fillVal
);
if
(
pColInfo
==
NULL
)
{
goto
_clean
;
}
STimeWindow
w
=
TSWINDOW_INITIALIZER
;
STimeWindow
w
=
TSWINDOW_INITIALIZER
;
TSKEY
sk
=
MIN
(
pQueryAttr
->
window
.
skey
,
pQueryAttr
->
window
.
ekey
);
TSKEY
sk
=
MIN
(
pQueryAttr
->
window
.
skey
,
pQueryAttr
->
window
.
ekey
);
...
@@ -7029,11 +7499,20 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
...
@@ -7029,11 +7499,20 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
taosCreateFillInfo
(
pQueryAttr
->
order
.
order
,
w
.
skey
,
0
,
(
int32_t
)
pRuntimeEnv
->
resultInfo
.
capacity
,
numOfOutput
,
taosCreateFillInfo
(
pQueryAttr
->
order
.
order
,
w
.
skey
,
0
,
(
int32_t
)
pRuntimeEnv
->
resultInfo
.
capacity
,
numOfOutput
,
pQueryAttr
->
interval
.
sliding
,
pQueryAttr
->
interval
.
slidingUnit
,
pQueryAttr
->
interval
.
sliding
,
pQueryAttr
->
interval
.
slidingUnit
,
(
int8_t
)
pQueryAttr
->
precision
,
pQueryAttr
->
fillType
,
pColInfo
,
pRuntimeEnv
->
qinfo
);
(
int8_t
)
pQueryAttr
->
precision
,
pQueryAttr
->
fillType
,
pColInfo
,
pRuntimeEnv
->
qinfo
);
if
(
pInfo
->
pFillInfo
==
NULL
)
{
goto
_clean
;
}
pInfo
->
p
=
calloc
(
pInfo
->
pFillInfo
->
numOfCols
,
POINTER_BYTES
);
pInfo
->
p
=
calloc
(
pInfo
->
pFillInfo
->
numOfCols
,
POINTER_BYTES
);
if
(
pInfo
->
p
==
NULL
)
{
goto
_clean
;
}
}
}
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"FillOperator"
;
pOperator
->
name
=
"FillOperator"
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
blockingOptr
=
false
;
...
@@ -7048,14 +7527,27 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
...
@@ -7048,14 +7527,27 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
appendUpstream
(
pOperator
,
upstream
);
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
return
pOperator
;
_clean:
destroySFillOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
SOperatorInfo
*
createSLimitOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
void
*
pMerger
,
bool
multigroupResult
)
{
SOperatorInfo
*
createSLimitOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
void
*
pMerger
,
bool
multigroupResult
)
{
SSLimitOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SSLimitOperatorInfo
));
SSLimitOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SSLimitOperatorInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
SQueryAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
SQueryAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
pInfo
->
orderColumnList
=
getResultGroupCheckColumns
(
pQueryAttr
);
pInfo
->
orderColumnList
=
getResultGroupCheckColumns
(
pQueryAttr
);
if
(
pInfo
->
orderColumnList
==
NULL
)
{
goto
_clean
;
}
pInfo
->
slimit
=
pQueryAttr
->
slimit
;
pInfo
->
slimit
=
pQueryAttr
->
slimit
;
pInfo
->
limit
=
pQueryAttr
->
limit
;
pInfo
->
limit
=
pQueryAttr
->
limit
;
pInfo
->
capacity
=
pRuntimeEnv
->
resultInfo
.
capacity
;
pInfo
->
capacity
=
pRuntimeEnv
->
resultInfo
.
capacity
;
...
@@ -7072,6 +7564,9 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
...
@@ -7072,6 +7564,9 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
int32_t
numOfCols
=
(
pInfo
->
orderColumnList
!=
NULL
)
?
(
int32_t
)
taosArrayGetSize
(
pInfo
->
orderColumnList
)
:
0
;
int32_t
numOfCols
=
(
pInfo
->
orderColumnList
!=
NULL
)
?
(
int32_t
)
taosArrayGetSize
(
pInfo
->
orderColumnList
)
:
0
;
pInfo
->
prevRow
=
calloc
(
1
,
(
POINTER_BYTES
*
numOfCols
+
len
));
pInfo
->
prevRow
=
calloc
(
1
,
(
POINTER_BYTES
*
numOfCols
+
len
));
if
(
pInfo
->
prevRow
==
NULL
)
{
goto
_clean
;
}
int32_t
offset
=
POINTER_BYTES
*
numOfCols
;
int32_t
offset
=
POINTER_BYTES
*
numOfCols
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
...
@@ -7083,7 +7578,14 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
...
@@ -7083,7 +7578,14 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
if
(
pInfo
->
pRes
==
NULL
)
{
goto
_clean
;
}
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"SLimitOperator"
;
pOperator
->
name
=
"SLimitOperator"
;
pOperator
->
operatorType
=
OP_SLimit
;
pOperator
->
operatorType
=
OP_SLimit
;
...
@@ -7096,6 +7598,12 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
...
@@ -7096,6 +7598,12 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
appendUpstream
(
pOperator
,
upstream
);
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
return
pOperator
;
_clean:
destroySlimitOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
static
SSDataBlock
*
doTagScan
(
void
*
param
,
bool
*
newgroup
)
{
static
SSDataBlock
*
doTagScan
(
void
*
param
,
bool
*
newgroup
)
{
...
@@ -7226,8 +7734,16 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
...
@@ -7226,8 +7734,16 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
SOperatorInfo
*
createTagScanOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SOperatorInfo
*
createTagScanOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
STagScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STagScanInfo
));
STagScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STagScanInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
if
(
pInfo
->
pRes
==
NULL
)
{
goto
_clean
;
}
size_t
numOfGroup
=
GET_NUM_OF_TABLEGROUP
(
pRuntimeEnv
);
size_t
numOfGroup
=
GET_NUM_OF_TABLEGROUP
(
pRuntimeEnv
);
assert
(
numOfGroup
==
0
||
numOfGroup
==
1
);
assert
(
numOfGroup
==
0
||
numOfGroup
==
1
);
...
@@ -7235,6 +7751,10 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
...
@@ -7235,6 +7751,10 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
pInfo
->
curPos
=
0
;
pInfo
->
curPos
=
0
;
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"SeqTableTagScan"
;
pOperator
->
name
=
"SeqTableTagScan"
;
pOperator
->
operatorType
=
OP_TagScan
;
pOperator
->
operatorType
=
OP_TagScan
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
blockingOptr
=
false
;
...
@@ -7247,7 +7767,14 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
...
@@ -7247,7 +7767,14 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
pOperator
->
cleanup
=
destroyTagScanOperatorInfo
;
pOperator
->
cleanup
=
destroyTagScanOperatorInfo
;
return
pOperator
;
return
pOperator
;
_clean:
destroyTagScanOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
static
bool
initMultiDistinctInfo
(
SDistinctOperatorInfo
*
pInfo
,
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
)
{
static
bool
initMultiDistinctInfo
(
SDistinctOperatorInfo
*
pInfo
,
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
)
{
if
(
taosArrayGetSize
(
pInfo
->
pDistinctDataInfo
)
==
pOperator
->
numOfOutput
)
{
if
(
taosArrayGetSize
(
pInfo
->
pDistinctDataInfo
)
==
pOperator
->
numOfOutput
)
{
// distinct info already inited
// distinct info already inited
...
@@ -7361,6 +7888,9 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
...
@@ -7361,6 +7888,9 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
SOperatorInfo
*
createDistinctOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SOperatorInfo
*
createDistinctOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SDistinctOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SDistinctOperatorInfo
));
SDistinctOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SDistinctOperatorInfo
));
if
(
pInfo
==
NULL
)
{
return
NULL
;
}
pInfo
->
totalBytes
=
0
;
pInfo
->
totalBytes
=
0
;
pInfo
->
buf
=
NULL
;
pInfo
->
buf
=
NULL
;
...
@@ -7370,7 +7900,15 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
...
@@ -7370,7 +7900,15 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
pInfo
->
pSet
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
pInfo
->
pSet
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
(
int32_t
)
pInfo
->
outputCapacity
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
(
int32_t
)
pInfo
->
outputCapacity
);
if
(
pInfo
->
pDistinctDataInfo
==
NULL
||
pInfo
->
pSet
==
NULL
||
pInfo
->
pRes
==
NULL
)
{
goto
_clean
;
}
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
)
{
goto
_clean
;
}
pOperator
->
name
=
"DistinctOperator"
;
pOperator
->
name
=
"DistinctOperator"
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
status
=
OP_IN_EXECUTING
;
...
@@ -7385,6 +7923,12 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
...
@@ -7385,6 +7923,12 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
appendUpstream
(
pOperator
,
upstream
);
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
return
pOperator
;
_clean:
destroyDistinctOperatorInfo
((
void
*
)
pInfo
,
numOfOutput
);
tfree
(
pInfo
);
return
NULL
;
}
}
static
int32_t
getColumnIndexInSource
(
SQueriedTableInfo
*
pTableInfo
,
SSqlExpr
*
pExpr
,
SColumnInfo
*
pTagCols
)
{
static
int32_t
getColumnIndexInSource
(
SQueriedTableInfo
*
pTableInfo
,
SSqlExpr
*
pExpr
,
SColumnInfo
*
pTagCols
)
{
...
...
src/query/src/qFill.c
浏览文件 @
afe03dc9
...
@@ -353,6 +353,10 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3
...
@@ -353,6 +353,10 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3
}
}
SFillInfo
*
pFillInfo
=
calloc
(
1
,
sizeof
(
SFillInfo
));
SFillInfo
*
pFillInfo
=
calloc
(
1
,
sizeof
(
SFillInfo
));
if
(
pFillInfo
==
NULL
)
{
return
NULL
;
}
taosResetFillInfo
(
pFillInfo
,
skey
);
taosResetFillInfo
(
pFillInfo
,
skey
);
pFillInfo
->
order
=
order
;
pFillInfo
->
order
=
order
;
...
@@ -370,6 +374,10 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3
...
@@ -370,6 +374,10 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3
pFillInfo
->
interval
.
slidingUnit
=
slidingUnit
;
pFillInfo
->
interval
.
slidingUnit
=
slidingUnit
;
pFillInfo
->
pData
=
malloc
(
POINTER_BYTES
*
numOfCols
);
pFillInfo
->
pData
=
malloc
(
POINTER_BYTES
*
numOfCols
);
if
(
pFillInfo
->
pData
==
NULL
)
{
tfree
(
pFillInfo
);
return
NULL
;
}
// if (numOfTags > 0) {
// if (numOfTags > 0) {
pFillInfo
->
pTags
=
calloc
(
numOfCols
,
sizeof
(
SFillTagColInfo
));
pFillInfo
->
pTags
=
calloc
(
numOfCols
,
sizeof
(
SFillTagColInfo
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录