Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
36c4b981
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
36c4b981
编写于
9月 14, 2022
作者:
H
Haojun Liao
提交者:
GitHub
9月 14, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #16813 from taosdata/feature/3_liaohj
refactor(query): do some internal refactor.
上级
6263c8fc
00908a64
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
153 addition
and
469 deletion
+153
-469
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+1
-0
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+8
-34
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+19
-2
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+37
-332
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+88
-101
未找到文件。
source/libs/executor/inc/executil.h
浏览文件 @
36c4b981
...
...
@@ -88,6 +88,7 @@ struct SqlFunctionCtx;
size_t
getResultRowSize
(
struct
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
void
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
closeResultRow
(
SResultRow
*
pResultRow
);
void
resetResultRow
(
SResultRow
*
pResultRow
,
size_t
entrySize
);
struct
SResultRowEntryInfo
*
getResultEntryInfo
(
const
SResultRow
*
pRow
,
int32_t
index
,
const
int32_t
*
offset
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
36c4b981
...
...
@@ -585,11 +585,12 @@ typedef struct SIntervalAggOperatorInfo {
typedef
struct
SMergeAlignedIntervalAggOperatorInfo
{
SIntervalAggOperatorInfo
*
intervalAggOperatorInfo
;
bool
hasGroupId
;
//
bool hasGroupId;
uint64_t
groupId
;
// current groupId
int64_t
curTs
;
// current ts
SSDataBlock
*
prefetchedBlock
;
SNode
*
pCondition
;
SResultRow
*
pResultRow
;
}
SMergeAlignedIntervalAggOperatorInfo
;
typedef
struct
SStreamIntervalOperatorInfo
{
...
...
@@ -649,7 +650,6 @@ typedef struct SAggOperatorInfo {
}
SAggOperatorInfo
;
typedef
struct
SProjectOperatorInfo
{
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SNode
*
pFilterNode
;
// filter info, which is push down by optimizer
...
...
@@ -691,7 +691,6 @@ typedef struct SFillOperatorInfo {
}
SFillOperatorInfo
;
typedef
struct
SGroupbyOperatorInfo
{
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
...
...
@@ -738,7 +737,6 @@ typedef struct SWindowRowsSup {
}
SWindowRowsSup
;
typedef
struct
SSessionAggOperatorInfo
{
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
...
...
@@ -827,7 +825,6 @@ typedef struct SStateWindowOperatorInfo {
SStateKeys
stateKey
;
int32_t
tsSlotId
;
// primary timestamp column slot id
STimeWindowAggSupp
twAggSup
;
// bool reptScan;
const
SNode
*
pCondition
;
}
SStateWindowOperatorInfo
;
...
...
@@ -848,24 +845,6 @@ typedef struct SStreamStateAggOperatorInfo {
bool
ignoreExpiredData
;
}
SStreamStateAggOperatorInfo
;
typedef
struct
SSortedMergeOperatorInfo
{
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SArray
*
pSortInfo
;
int32_t
numOfSources
;
SSortHandle
*
pSortHandle
;
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
int32_t
resultRowFactor
;
bool
hasGroupVal
;
SDiskbasedBuf
*
pTupleStore
;
// keep the final results
int32_t
numOfResPerPage
;
char
**
groupVal
;
SArray
*
groupInfo
;
}
SSortedMergeOperatorInfo
;
typedef
struct
SSortOperatorInfo
{
SOptrBasicInfo
binfo
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
...
...
@@ -873,11 +852,10 @@ typedef struct SSortOperatorInfo {
SSortHandle
*
pSortHandle
;
SArray
*
pColMatchInfo
;
// for index map from table scan output
int32_t
bufPageSize
;
int64_t
startTs
;
// sort start time
uint64_t
sortElapsed
;
// sort elapsed time, time to flush to disk not included.
SLimitInfo
limitInfo
;
SNode
*
pCondition
;
int64_t
startTs
;
// sort start time
uint64_t
sortElapsed
;
// sort elapsed time, time to flush to disk not included.
SLimitInfo
limitInfo
;
SNode
*
pCondition
;
}
SSortOperatorInfo
;
typedef
struct
STagFilterOperatorInfo
{
...
...
@@ -909,7 +887,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
__optr_decode_fn_t
decode
,
__optr_explain_fn_t
explain
);
int32_t
operatorDummyOpenFn
(
SOperatorInfo
*
pOperator
);
void
operatorDummyCloseFn
(
void
*
param
,
int32_t
numOfCols
);
int32_t
appendDownstream
(
SOperatorInfo
*
p
,
SOperatorInfo
**
pDownstream
,
int32_t
num
);
void
initBasicInfo
(
SOptrBasicInfo
*
pInfo
,
SSDataBlock
*
pBlock
);
...
...
@@ -944,7 +921,6 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
SSDataBlock
*
pBlock
,
const
char
*
idStr
);
void
cleanupAggSup
(
SAggSupporter
*
pAggSup
);
void
destroyBasicOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
void
appendOneRowToDataBlock
(
SSDataBlock
*
pBlock
,
STupleHandle
*
pTupleHandle
);
void
setTbNameColData
(
void
*
pMeta
,
const
SSDataBlock
*
pBlock
,
SColumnInfoData
*
pColInfoData
,
int32_t
functionId
);
...
...
@@ -1091,10 +1067,8 @@ void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEn
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
);
uint64_t
calGroupIdByData
(
SPartitionBySupporter
*
pParSup
,
SExprSupp
*
pExprSup
,
SSDataBlock
*
pBlock
,
int32_t
rowId
);
int32_t
finalizeResultRowIntoResultDataBlock
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
SqlFunctionCtx
*
pCtx
,
SExprInfo
*
pExprInfo
,
int32_t
numOfExprs
,
const
int32_t
*
rowCellOffset
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
int32_t
finalizeResultRows
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
SExprSupp
*
pSup
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
int32_t
createScanTableListInfo
(
SScanPhysiNode
*
pScanNode
,
SNodeList
*
pGroupTags
,
bool
groupSort
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
...
...
source/libs/executor/src/executil.c
浏览文件 @
36c4b981
...
...
@@ -33,6 +33,17 @@ void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
void
closeResultRow
(
SResultRow
*
pResultRow
)
{
pResultRow
->
closed
=
true
;
}
void
resetResultRow
(
SResultRow
*
pResultRow
,
size_t
entrySize
)
{
pResultRow
->
numOfRows
=
0
;
pResultRow
->
closed
=
false
;
pResultRow
->
endInterp
=
false
;
pResultRow
->
startInterp
=
false
;
if
(
entrySize
>
0
)
{
memset
(
pResultRow
->
pEntryInfo
,
0
,
entrySize
);
}
}
// TODO refactor: use macro
SResultRowEntryInfo
*
getResultEntryInfo
(
const
SResultRow
*
pRow
,
int32_t
index
,
const
int32_t
*
offset
)
{
assert
(
index
>=
0
&&
offset
!=
NULL
);
...
...
@@ -799,9 +810,15 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
taosMemoryFreeClear
(
pColInfoData
);
}
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
res
);
i
++
)
{
size_t
numOfTables
=
taosArrayGetSize
(
res
);
for
(
int
i
=
0
;
i
<
numOfTables
;
i
++
)
{
STableKeyInfo
info
=
{.
uid
=
*
(
uint64_t
*
)
taosArrayGet
(
res
,
i
),
.
groupId
=
0
};
taosArrayPush
(
pListInfo
->
pTableList
,
&
info
);
void
*
p
=
taosArrayPush
(
pListInfo
->
pTableList
,
&
info
);
if
(
p
==
NULL
)
{
taosArrayDestroy
(
res
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
qDebug
(
"tagfilter get uid:%ld"
,
info
.
uid
);
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
36c4b981
...
...
@@ -132,8 +132,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
return
fpSet
;
}
void
operatorDummyCloseFn
(
void
*
param
,
int32_t
numOfCols
)
{}
static
int32_t
doCopyToSDataBlock
(
SExecTaskInfo
*
pTaskInfo
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
);
...
...
@@ -1269,33 +1267,12 @@ static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t nu
}
}
// todo extract method with copytoSSDataBlock
int32_t
finalizeResultRowIntoResultDataBlock
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
SqlFunctionCtx
*
pCtx
,
SExprInfo
*
pExprInfo
,
int32_t
numOfExprs
,
const
int32_t
*
rowCellOffset
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
)
{
SFilePage
*
page
=
getBufPage
(
pBuf
,
resultRowPosition
->
pageId
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
page
+
resultRowPosition
->
offset
);
doUpdateNumOfRows
(
pCtx
,
pRow
,
numOfExprs
,
rowCellOffset
);
if
(
pRow
->
numOfRows
==
0
)
{
releaseBufPage
(
pBuf
,
page
);
return
0
;
}
while
(
pBlock
->
info
.
rows
+
pRow
->
numOfRows
>
pBlock
->
info
.
capacity
)
{
int32_t
code
=
blockDataEnsureCapacity
(
pBlock
,
pBlock
->
info
.
capacity
*
1
.
25
);
if
(
TAOS_FAILED
(
code
))
{
releaseBufPage
(
pBuf
,
page
);
qError
(
"%s ensure result data capacity failed, code %s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
code
));
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
}
static
void
doCopyResultToDataBlock
(
SExprInfo
*
pExprInfo
,
int32_t
numOfExprs
,
SResultRow
*
pRow
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
const
int32_t
*
rowEntryOffset
,
SExecTaskInfo
*
pTaskInfo
)
{
for
(
int32_t
j
=
0
;
j
<
numOfExprs
;
++
j
)
{
int32_t
slotId
=
pExprInfo
[
j
].
base
.
resSchema
.
slotId
;
pCtx
[
j
].
resultInfo
=
getResultEntryInfo
(
pRow
,
j
,
row
Cell
Offset
);
pCtx
[
j
].
resultInfo
=
getResultEntryInfo
(
pRow
,
j
,
row
Entry
Offset
);
if
(
pCtx
[
j
].
fpSet
.
finalize
)
{
int32_t
code
=
pCtx
[
j
].
fpSet
.
finalize
(
&
pCtx
[
j
],
pBlock
);
if
(
TAOS_FAILED
(
code
))
{
...
...
@@ -1303,7 +1280,7 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
}
else
if
(
strcmp
(
pCtx
[
j
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_select_value"
)
==
0
)
{
// do nothing
, todo refactor
// do nothing
}
else
{
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
...
...
@@ -1314,10 +1291,40 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
}
}
}
}
// todo refactor. SResultRow has direct pointer in miainfo
int32_t
finalizeResultRows
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
SExprSupp
*
pSup
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
)
{
SFilePage
*
page
=
getBufPage
(
pBuf
,
resultRowPosition
->
pageId
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
page
+
resultRowPosition
->
offset
);
SqlFunctionCtx
*
pCtx
=
pSup
->
pCtx
;
SExprInfo
*
pExprInfo
=
pSup
->
pExprInfo
;
const
int32_t
*
rowEntryOffset
=
pSup
->
rowEntryInfoOffset
;
doUpdateNumOfRows
(
pCtx
,
pRow
,
pSup
->
numOfExprs
,
rowEntryOffset
);
if
(
pRow
->
numOfRows
==
0
)
{
releaseBufPage
(
pBuf
,
page
);
return
0
;
}
int32_t
size
=
pBlock
->
info
.
capacity
;
while
(
pBlock
->
info
.
rows
+
pRow
->
numOfRows
>
size
)
{
size
=
size
*
1
.
25
;
}
int32_t
code
=
blockDataEnsureCapacity
(
pBlock
,
size
);
if
(
TAOS_FAILED
(
code
))
{
releaseBufPage
(
pBuf
,
page
);
qError
(
"%s ensure result data capacity failed, code %s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
code
));
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
doCopyResultToDataBlock
(
pExprInfo
,
pSup
->
numOfExprs
,
pRow
,
pCtx
,
pBlock
,
rowEntryOffset
,
pTaskInfo
);
releaseBufPage
(
pBuf
,
page
);
pBlock
->
info
.
rows
+=
pRow
->
numOfRows
;
return
0
;
}
...
...
@@ -1362,32 +1369,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
}
pGroupResInfo
->
index
+=
1
;
for
(
int32_t
j
=
0
;
j
<
numOfExprs
;
++
j
)
{
int32_t
slotId
=
pExprInfo
[
j
].
base
.
resSchema
.
slotId
;
pCtx
[
j
].
resultInfo
=
getResultEntryInfo
(
pRow
,
j
,
rowEntryOffset
);
if
(
pCtx
[
j
].
fpSet
.
finalize
)
{
#ifdef BUF_PAGE_DEBUG
qDebug
(
"
\n
page_finalize %d"
,
numOfExprs
);
#endif
int32_t
code
=
pCtx
[
j
].
fpSet
.
finalize
(
&
pCtx
[
j
],
pBlock
);
if
(
TAOS_FAILED
(
code
))
{
qError
(
"%s build result data block error, code %s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
code
));
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
}
else
if
(
strcmp
(
pCtx
[
j
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_select_value"
)
==
0
)
{
// do nothing, todo refactor
}
else
{
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
char
*
in
=
GET_ROWCELL_INTERBUF
(
pCtx
[
j
].
resultInfo
);
for
(
int32_t
k
=
0
;
k
<
pRow
->
numOfRows
;
++
k
)
{
colDataAppend
(
pColInfoData
,
pBlock
->
info
.
rows
+
k
,
in
,
pCtx
[
j
].
resultInfo
->
isNullRes
);
}
}
}
doCopyResultToDataBlock
(
pExprInfo
,
numOfExprs
,
pRow
,
pCtx
,
pBlock
,
rowEntryOffset
,
pTaskInfo
);
releaseBufPage
(
pBuf
,
page
);
pBlock
->
info
.
rows
+=
pRow
->
numOfRows
;
...
...
@@ -1727,22 +1709,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
static
void
doDestroyTableList
(
STableListInfo
*
pTableqinfoList
);
static
void
doTableQueryInfoTimeWindowCheck
(
SExecTaskInfo
*
pTaskInfo
,
STableQueryInfo
*
pTableQueryInfo
,
int32_t
order
)
{
#if 0
if (order == TSDB_ORDER_ASC) {
assert(
(pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) &&
(pTableQueryInfo->lastKey >= pTaskInfo->window.skey) &&
(pTableQueryInfo->win.skey >= pTaskInfo->window.skey && pTableQueryInfo->win.ekey <= pTaskInfo->window.ekey));
} else {
assert(
(pTableQueryInfo->win.skey >= pTableQueryInfo->win.ekey) &&
(pTableQueryInfo->lastKey <= pTaskInfo->window.skey) &&
(pTableQueryInfo->win.skey <= pTaskInfo->window.skey && pTableQueryInfo->win.ekey >= pTaskInfo->window.ekey));
}
#endif
}
typedef
struct
SFetchRspHandleWrapper
{
uint32_t
exchangeId
;
int32_t
sourceIndex
;
...
...
@@ -2307,21 +2273,6 @@ _error:
static
int32_t
doInitAggInfoSup
(
SAggSupporter
*
pAggSup
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
size_t
keyBufSize
,
const
char
*
pKey
);
static
void
destroySortedMergeOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SSortedMergeOperatorInfo
*
pInfo
=
(
SSortedMergeOperatorInfo
*
)
param
;
taosArrayDestroy
(
pInfo
->
pSortInfo
);
taosArrayDestroy
(
pInfo
->
groupInfo
);
if
(
pInfo
->
pSortHandle
!=
NULL
)
{
tsortDestroySortHandle
(
pInfo
->
pSortHandle
);
}
blockDataDestroy
(
pInfo
->
binfo
.
pRes
);
cleanupAggSup
(
&
pInfo
->
aggSup
);
taosMemoryFreeClear
(
param
);
}
static
bool
needToMerge
(
SSDataBlock
*
pBlock
,
SArray
*
groupInfo
,
char
**
buf
,
int32_t
rowIndex
)
{
size_t
size
=
taosArrayGetSize
(
groupInfo
);
if
(
size
==
0
)
{
...
...
@@ -2357,41 +2308,6 @@ static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int3
return
0
;
}
static
void
doMergeResultImpl
(
SSortedMergeOperatorInfo
*
pInfo
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExpr
,
int32_t
rowIndex
)
{
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
// TODO set row index
// pCtx[j].startRow = rowIndex;
}
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
int32_t
functionId
=
pCtx
[
j
].
functionId
;
// pCtx[j].fpSet->addInput(&pCtx[j]);
// if (functionId < 0) {
// SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
// doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
// } else {
// assert(!TSDB_FUNC_IS_SCALAR(functionId));
// aAggs[functionId].mergeFunc(&pCtx[j]);
// }
}
}
static
void
doFinalizeResultImpl
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExpr
)
{
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
int32_t
functionId
=
pCtx
[
j
].
functionId
;
// if (functionId == FUNC_TAG_DUMMY || functionId == FUNC_TS_DUMMY) {
// continue;
// }
// if (functionId < 0) {
// SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
// doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
// } else {
// pCtx[j].fpSet.finalize(&pCtx[j]);
}
}
static
bool
saveCurrentTuple
(
char
**
rowColData
,
SArray
*
pColumnList
,
SSDataBlock
*
pBlock
,
int32_t
rowIndex
)
{
int32_t
size
=
(
int32_t
)
taosArrayGetSize
(
pColumnList
);
...
...
@@ -2406,210 +2322,6 @@ static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock
return
true
;
}
static
void
doMergeImpl
(
SOperatorInfo
*
pOperator
,
int32_t
numOfExpr
,
SSDataBlock
*
pBlock
)
{
SSortedMergeOperatorInfo
*
pInfo
=
pOperator
->
info
;
SqlFunctionCtx
*
pCtx
=
pOperator
->
exprSupp
.
pCtx
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
if
(
!
pInfo
->
hasGroupVal
)
{
ASSERT
(
i
==
0
);
doMergeResultImpl
(
pInfo
,
pCtx
,
numOfExpr
,
i
);
pInfo
->
hasGroupVal
=
saveCurrentTuple
(
pInfo
->
groupVal
,
pInfo
->
groupInfo
,
pBlock
,
i
);
}
else
{
if
(
needToMerge
(
pBlock
,
pInfo
->
groupInfo
,
pInfo
->
groupVal
,
i
))
{
doMergeResultImpl
(
pInfo
,
pCtx
,
numOfExpr
,
i
);
}
else
{
doFinalizeResultImpl
(
pCtx
,
numOfExpr
);
int32_t
numOfRows
=
getNumOfResult
(
pOperator
->
exprSupp
.
pCtx
,
pOperator
->
exprSupp
.
numOfExprs
,
NULL
);
// setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
// TODO check for available buffer;
// next group info data
pInfo
->
binfo
.
pRes
->
info
.
rows
+=
numOfRows
;
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
if
(
pCtx
[
j
].
functionId
<
0
)
{
continue
;
}
pCtx
[
j
].
fpSet
.
process
(
&
pCtx
[
j
]);
}
doMergeResultImpl
(
pInfo
,
pCtx
,
numOfExpr
,
i
);
pInfo
->
hasGroupVal
=
saveCurrentTuple
(
pInfo
->
groupVal
,
pInfo
->
groupInfo
,
pBlock
,
i
);
}
}
}
}
static
SSDataBlock
*
doMerge
(
SOperatorInfo
*
pOperator
)
{
SSortedMergeOperatorInfo
*
pInfo
=
pOperator
->
info
;
SSortHandle
*
pHandle
=
pInfo
->
pSortHandle
;
SSDataBlock
*
pDataBlock
=
createOneDataBlock
(
pInfo
->
binfo
.
pRes
,
false
);
blockDataEnsureCapacity
(
pDataBlock
,
pOperator
->
resultInfo
.
capacity
);
while
(
1
)
{
blockDataCleanup
(
pDataBlock
);
while
(
1
)
{
STupleHandle
*
pTupleHandle
=
tsortNextTuple
(
pHandle
);
if
(
pTupleHandle
==
NULL
)
{
break
;
}
// build datablock for merge for one group
appendOneRowToDataBlock
(
pDataBlock
,
pTupleHandle
);
if
(
pDataBlock
->
info
.
rows
>=
pOperator
->
resultInfo
.
capacity
)
{
break
;
}
}
if
(
pDataBlock
->
info
.
rows
==
0
)
{
break
;
}
setInputDataBlock
(
pOperator
,
pOperator
->
exprSupp
.
pCtx
,
pDataBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
// updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
// pOperator->pRuntimeEnv, true);
doMergeImpl
(
pOperator
,
pOperator
->
exprSupp
.
numOfExprs
,
pDataBlock
);
// flush to tuple store, and after all data have been handled, return to upstream node or sink node
}
doFinalizeResultImpl
(
pOperator
->
exprSupp
.
pCtx
,
pOperator
->
exprSupp
.
numOfExprs
);
int32_t
numOfRows
=
getNumOfResult
(
pOperator
->
exprSupp
.
pCtx
,
pOperator
->
exprSupp
.
numOfExprs
,
NULL
);
// setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
// TODO check for available buffer;
// next group info data
pInfo
->
binfo
.
pRes
->
info
.
rows
+=
numOfRows
;
return
(
pInfo
->
binfo
.
pRes
->
info
.
rows
>
0
)
?
pInfo
->
binfo
.
pRes
:
NULL
;
}
SSDataBlock
*
getSortedMergeBlockData
(
SSortHandle
*
pHandle
,
SSDataBlock
*
pDataBlock
,
int32_t
capacity
,
SArray
*
pColMatchInfo
,
SSortedMergeOperatorInfo
*
pInfo
)
{
blockDataCleanup
(
pDataBlock
);
SSDataBlock
*
p
=
tsortGetSortedDataBlock
(
pHandle
);
if
(
p
==
NULL
)
{
return
NULL
;
}
blockDataEnsureCapacity
(
p
,
capacity
);
while
(
1
)
{
STupleHandle
*
pTupleHandle
=
tsortNextTuple
(
pHandle
);
if
(
pTupleHandle
==
NULL
)
{
break
;
}
appendOneRowToDataBlock
(
p
,
pTupleHandle
);
if
(
p
->
info
.
rows
>=
capacity
)
{
break
;
}
}
if
(
p
->
info
.
rows
>
0
)
{
int32_t
numOfCols
=
taosArrayGetSize
(
pColMatchInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColMatchInfo
*
pmInfo
=
taosArrayGet
(
pColMatchInfo
,
i
);
ASSERT
(
pmInfo
->
matchType
==
COL_MATCH_FROM_SLOT_ID
);
SColumnInfoData
*
pSrc
=
taosArrayGet
(
p
->
pDataBlock
,
pmInfo
->
srcSlotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
pmInfo
->
targetSlotId
);
colDataAssign
(
pDst
,
pSrc
,
p
->
info
.
rows
,
&
pDataBlock
->
info
);
}
pDataBlock
->
info
.
rows
=
p
->
info
.
rows
;
pDataBlock
->
info
.
capacity
=
p
->
info
.
rows
;
}
blockDataDestroy
(
p
);
return
(
pDataBlock
->
info
.
rows
>
0
)
?
pDataBlock
:
NULL
;
}
static
SSDataBlock
*
doSortedMerge
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SSortedMergeOperatorInfo
*
pInfo
=
pOperator
->
info
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
return
getSortedMergeBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
,
NULL
,
pInfo
);
}
int32_t
numOfBufPage
=
pInfo
->
sortBufSize
/
pInfo
->
bufPageSize
;
pInfo
->
pSortHandle
=
tsortCreateSortHandle
(
pInfo
->
pSortInfo
,
SORT_MULTISOURCE_MERGE
,
pInfo
->
bufPageSize
,
numOfBufPage
,
pInfo
->
binfo
.
pRes
,
"GET_TASKID(pTaskInfo)"
);
tsortSetFetchRawDataFp
(
pInfo
->
pSortHandle
,
loadNextDataBlock
,
NULL
,
NULL
);
for
(
int32_t
i
=
0
;
i
<
pOperator
->
numOfDownstream
;
++
i
)
{
SSortSource
*
ps
=
taosMemoryCalloc
(
1
,
sizeof
(
SSortSource
));
ps
->
param
=
pOperator
->
pDownstream
[
i
];
tsortAddSource
(
pInfo
->
pSortHandle
,
ps
);
}
int32_t
code
=
tsortOpen
(
pInfo
->
pSortHandle
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
terrno
);
}
pOperator
->
status
=
OP_RES_TO_RETURN
;
return
doMerge
(
pOperator
);
}
static
int32_t
initGroupCol
(
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SArray
*
pGroupInfo
,
SSortedMergeOperatorInfo
*
pInfo
)
{
if
(
pGroupInfo
==
NULL
||
taosArrayGetSize
(
pGroupInfo
)
==
0
)
{
return
0
;
}
int32_t
len
=
0
;
SArray
*
plist
=
taosArrayInit
(
3
,
sizeof
(
SColumn
));
pInfo
->
groupInfo
=
taosArrayInit
(
3
,
sizeof
(
int32_t
));
if
(
plist
==
NULL
||
pInfo
->
groupInfo
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
size_t
numOfGroupCol
=
taosArrayGetSize
(
pInfo
->
groupInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfGroupCol
;
++
i
)
{
SColumn
*
pCol
=
taosArrayGet
(
pGroupInfo
,
i
);
for
(
int32_t
j
=
0
;
j
<
numOfCols
;
++
j
)
{
SExprInfo
*
pe
=
&
pExprInfo
[
j
];
if
(
pe
->
base
.
resSchema
.
slotId
==
pCol
->
colId
)
{
taosArrayPush
(
plist
,
pCol
);
taosArrayPush
(
pInfo
->
groupInfo
,
&
j
);
len
+=
pCol
->
bytes
;
break
;
}
}
}
ASSERT
(
taosArrayGetSize
(
pGroupInfo
)
==
taosArrayGetSize
(
plist
));
pInfo
->
groupVal
=
taosMemoryCalloc
(
1
,
(
POINTER_BYTES
*
numOfGroupCol
+
len
));
if
(
pInfo
->
groupVal
==
NULL
)
{
taosArrayDestroy
(
plist
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
offset
=
0
;
char
*
start
=
(
char
*
)(
pInfo
->
groupVal
+
(
POINTER_BYTES
*
numOfGroupCol
));
for
(
int32_t
i
=
0
;
i
<
numOfGroupCol
;
++
i
)
{
pInfo
->
groupVal
[
i
]
=
start
+
offset
;
SColumn
*
pCol
=
taosArrayGet
(
plist
,
i
);
offset
+=
pCol
->
bytes
;
}
taosArrayDestroy
(
plist
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
getTableScanInfo
(
SOperatorInfo
*
pOperator
,
int32_t
*
order
,
int32_t
*
scanFlag
)
{
// todo add more information about exchange operation
int32_t
type
=
pOperator
->
operatorType
;
...
...
@@ -3342,13 +3054,6 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
pInfo
->
pRes
=
blockDataDestroy
(
pInfo
->
pRes
);
}
void
destroyBasicOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SOptrBasicInfo
*
pInfo
=
(
SOptrBasicInfo
*
)
param
;
cleanupBasicInfo
(
pInfo
);
taosMemoryFreeClear
(
param
);
}
static
void
freeItem
(
void
*
pItem
)
{
void
**
p
=
pItem
;
if
(
*
p
!=
NULL
)
{
...
...
@@ -3855,7 +3560,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STagScanPhysiNode
*
pScanPhyNode
=
(
STagScanPhysiNode
*
)
pPhyNode
;
int32_t
code
=
getTableList
(
pHandle
->
meta
,
pHandle
->
vnode
,
pScanPhyNode
,
pTagCond
,
pTagIndexCond
,
pTableListInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
terrno
;
pTaskInfo
->
code
=
code
;
qError
(
"failed to getTableList, code: %s"
,
tstrerror
(
code
));
return
NULL
;
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
36c4b981
...
...
@@ -46,19 +46,6 @@ static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, co
uint64_t
groupId
);
static
void
doCloseWindow
(
SResultRowInfo
*
pResultRowInfo
,
const
SIntervalAggOperatorInfo
*
pInfo
,
SResultRow
*
pResult
);
///*
// * There are two cases to handle:
// *
// * 1. Query range is not set yet (queryRangeSet = 0). we need to set the query range info, including
// * pQueryAttr->lastKey, pQueryAttr->window.skey, and pQueryAttr->eKey.
// * 2. Query range is set and query is in progress. There may be another result with the same query ranges to be
// * merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there
// * is a previous result generated or not.
// */
// static void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWindow* pQRange) {
// // do nothing
//}
static
TSKEY
getStartTsKey
(
STimeWindow
*
win
,
const
TSKEY
*
tsCols
)
{
return
tsCols
==
NULL
?
win
->
skey
:
tsCols
[
0
];
}
static
int32_t
setTimeWindowOutputBuf
(
SResultRowInfo
*
pResultRowInfo
,
STimeWindow
*
win
,
bool
masterscan
,
...
...
@@ -3011,9 +2998,9 @@ static void addRetriveWindow(SArray* wins, SStreamFinalIntervalOperatorInfo* pIn
SPullWindowInfo
pull
=
{.
window
=
nextWin
,
.
groupId
=
winKey
->
groupId
};
// add pull data request
savePullWindow
(
&
pull
,
pInfo
->
pPullWins
);
int32_t
size
=
taosArrayGetSize
(
pInfo
->
pChildren
);
addPullWindow
(
pInfo
->
pPullDataMap
,
winKey
,
size
);
qDebug
(
"===stream===prepare retrive for delete %"
PRId64
", size:%d"
,
winKey
->
ts
,
size
);
int32_t
size
1
=
taosArrayGetSize
(
pInfo
->
pChildren
);
addPullWindow
(
pInfo
->
pPullDataMap
,
winKey
,
size
1
);
qDebug
(
"===stream===prepare retrive for delete %"
PRId64
", size:%d"
,
winKey
->
ts
,
size
1
);
}
}
}
...
...
@@ -4895,72 +4882,65 @@ _error:
return
NULL
;
}
void
destroyM
ergeAlignedInterval
OperatorInfo
(
void
*
param
)
{
void
destroyM
AI
OperatorInfo
(
void
*
param
)
{
SMergeAlignedIntervalAggOperatorInfo
*
miaInfo
=
(
SMergeAlignedIntervalAggOperatorInfo
*
)
param
;
destroyIntervalOperatorInfo
(
miaInfo
->
intervalAggOperatorInfo
);
taosMemoryFreeClear
(
param
);
}
static
int32_t
outputMergeAlignedIntervalResult
(
SOperatorInfo
*
pOperatorInfo
,
uint64_t
tableGroupId
,
SSDataBlock
*
pResultBlock
,
TSKEY
wstartTs
)
{
SMergeAlignedIntervalAggOperatorInfo
*
miaInfo
=
pOperatorInfo
->
info
;
SIntervalAggOperatorInfo
*
iaInfo
=
miaInfo
->
intervalAggOperatorInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
SExprSupp
*
pSup
=
&
pOperatorInfo
->
exprSupp
;
SET_RES_WINDOW_KEY
(
iaInfo
->
aggSup
.
keyBuf
,
&
wstartTs
,
TSDB_KEYSIZE
,
tableGroupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
tSimpleHashGet
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
ASSERT
(
p1
!=
NULL
);
static
SResultRow
*
doSetSingleOutputTupleBuf
(
SResultRowInfo
*
pResultRowInfo
,
SAggSupporter
*
pSup
)
{
SResultRow
*
pResult
=
getNewResultRow
(
pSup
->
pResultBuf
,
&
pSup
->
currentPageId
,
pSup
->
resultRowSize
);
pResultRowInfo
->
cur
=
(
SResultRowPosition
){.
pageId
=
pResult
->
pageId
,
.
offset
=
pResult
->
offset
};
return
pResult
;
}
finalizeResultRowIntoResultDataBlock
(
iaInfo
->
aggSup
.
pResultBuf
,
p1
,
pSup
->
pCtx
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pSup
->
rowEntryInfoOffset
,
pResultBlock
,
pTaskInfo
);
tSimpleHashRemove
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
ASSERT
(
tSimpleHashGetSize
(
iaInfo
->
aggSup
.
pResultRowHashTable
)
==
0
);
static
int32_t
setSingleOutputTupleBuf
(
SResultRowInfo
*
pResultRowInfo
,
STimeWindow
*
win
,
SResultRow
**
pResult
,
SExprSupp
*
pExprSup
,
SAggSupporter
*
pAggSup
)
{
if
(
*
pResult
==
NULL
)
{
*
pResult
=
doSetSingleOutputTupleBuf
(
pResultRowInfo
,
pAggSup
);
if
(
*
pResult
==
NULL
)
{
return
terrno
;
}
}
// set time window for current result
(
*
pResult
)
->
win
=
(
*
win
);
setResultRowInitCtx
((
*
pResult
),
pExprSup
->
pCtx
,
pExprSup
->
numOfExprs
,
pExprSup
->
rowEntryInfoOffset
);
return
TSDB_CODE_SUCCESS
;
}
static
void
doMergeAlignedIntervalAggImpl
(
SOperatorInfo
*
pOperatorInfo
,
SResultRowInfo
*
pResultRowInfo
,
SSDataBlock
*
pBlock
,
int32_t
scanFlag
,
SSDataBlock
*
pResultBlock
)
{
SSDataBlock
*
pBlock
,
SSDataBlock
*
pResultBlock
)
{
SMergeAlignedIntervalAggOperatorInfo
*
miaInfo
=
pOperatorInfo
->
info
;
SIntervalAggOperatorInfo
*
iaInfo
=
miaInfo
->
intervalAggOperatorInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
SExprSupp
*
pSup
=
&
pOperatorInfo
->
exprSupp
;
SInterval
*
pInterval
=
&
iaInfo
->
interval
;
int32_t
startPos
=
0
;
int32_t
numOfOutput
=
pSup
->
numOfExprs
;
int64_t
*
tsCols
=
extractTsCol
(
pBlock
,
iaInfo
);
uint64_t
tableGroupId
=
pBlock
->
info
.
groupId
;
SResultRow
*
pResult
=
NULL
;
int32_t
startPos
=
0
;
int64_t
*
tsCols
=
extractTsCol
(
pBlock
,
iaInfo
);
TSKEY
ts
=
getStartTsKey
(
&
pBlock
->
info
.
window
,
tsCols
);
// there is an result exists
if
(
miaInfo
->
curTs
!=
INT64_MIN
)
{
ASSERT
(
tSimpleHashGetSize
(
iaInfo
->
aggSup
.
pResultRowHashTable
)
==
1
);
if
(
ts
!=
miaInfo
->
curTs
)
{
outputMergeAlignedIntervalResult
(
pOperatorInfo
,
tableGroupId
,
pResultBlock
,
miaInfo
->
curTs
);
finalizeResultRows
(
iaInfo
->
aggSup
.
pResultBuf
,
&
pResultRowInfo
->
cur
,
pSup
,
pResultBlock
,
pTaskInfo
);
resetResultRow
(
miaInfo
->
pResultRow
,
iaInfo
->
aggSup
.
resultRowSize
-
sizeof
(
SResultRow
));
miaInfo
->
curTs
=
ts
;
}
}
else
{
miaInfo
->
curTs
=
ts
;
ASSERT
(
tSimpleHashGetSize
(
iaInfo
->
aggSup
.
pResultRowHashTable
)
==
0
);
}
STimeWindow
win
=
{
0
};
win
.
skey
=
miaInfo
->
curTs
;
win
.
ekey
=
taosTimeAdd
(
win
.
skey
,
iaInfo
->
interval
.
interval
,
iaInfo
->
interval
.
intervalUnit
,
iaInfo
->
interval
.
precision
)
-
1
;
win
.
ekey
=
taosTimeAdd
(
win
.
skey
,
pInterval
->
interval
,
pInterval
->
intervalUnit
,
pInterval
->
precision
)
-
1
;
// TODO: remove the hash table (groupid + winkey => result row position)
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
iaInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
int32_t
ret
=
setSingleOutputTupleBuf
(
pResultRowInfo
,
&
win
,
&
miaInfo
->
pResultRow
,
pSup
,
&
iaInfo
->
aggSup
);
if
(
ret
!=
TSDB_CODE_SUCCESS
||
miaInfo
->
pResultRow
==
NULL
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
ret
);
}
int32_t
currPos
=
startPos
;
...
...
@@ -4973,21 +4953,19 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
updateTimeWindowInfo
(
&
iaInfo
->
twAggSup
.
timeWindowData
,
&
currWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
currPos
-
startPos
,
pBlock
->
info
.
rows
,
numOfOutput
);
pBlock
->
info
.
rows
,
pSup
->
numOfExprs
);
outputMergeAlignedIntervalResult
(
pOperatorInfo
,
tableGroupId
,
pResultBlock
,
miaInfo
->
curTs
);
finalizeResultRows
(
iaInfo
->
aggSup
.
pResultBuf
,
&
pResultRowInfo
->
cur
,
pSup
,
pResultBlock
,
pTaskInfo
);
resetResultRow
(
miaInfo
->
pResultRow
,
iaInfo
->
aggSup
.
resultRowSize
-
sizeof
(
SResultRow
));
miaInfo
->
curTs
=
tsCols
[
currPos
];
currWin
.
skey
=
miaInfo
->
curTs
;
currWin
.
ekey
=
taosTimeAdd
(
currWin
.
skey
,
iaInfo
->
interval
.
interval
,
iaInfo
->
interval
.
intervalUnit
,
iaInfo
->
interval
.
precision
)
-
1
;
currWin
.
ekey
=
taosTimeAdd
(
currWin
.
skey
,
pInterval
->
interval
,
pInterval
->
intervalUnit
,
pInterval
->
precision
)
-
1
;
startPos
=
currPos
;
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
currWin
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
iaInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
ret
=
setSingleOutputTupleBuf
(
pResultRowInfo
,
&
win
,
&
miaInfo
->
pResultRow
,
pSup
,
&
iaInfo
->
aggSup
);
if
(
ret
!=
TSDB_CODE_SUCCESS
||
miaInfo
->
pResultRow
==
NULL
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
ret
);
}
miaInfo
->
curTs
=
currWin
.
skey
;
...
...
@@ -4995,68 +4973,79 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
updateTimeWindowInfo
(
&
iaInfo
->
twAggSup
.
timeWindowData
,
&
currWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
currPos
-
startPos
,
pBlock
->
info
.
rows
,
numOfOutput
);
pBlock
->
info
.
rows
,
pSup
->
numOfExprs
);
}
static
void
cleanupAfterGroupResultGen
(
SMergeAlignedIntervalAggOperatorInfo
*
pMiaInfo
,
SSDataBlock
*
pRes
)
{
pRes
->
info
.
groupId
=
pMiaInfo
->
groupId
;
pMiaInfo
->
curTs
=
INT64_MIN
;
pMiaInfo
->
groupId
=
0
;
}
static
void
doMergeAlignedIntervalAgg
(
SOperatorInfo
*
pOperator
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SMergeAlignedIntervalAggOperatorInfo
*
miaInfo
=
pOperator
->
info
;
SIntervalAggOperatorInfo
*
iaInfo
=
miaInfo
->
intervalAggOperatorInfo
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SSDataBlock
*
pRes
=
iaInfo
->
binfo
.
pRes
;
SMergeAlignedIntervalAggOperatorInfo
*
pMiaInfo
=
pOperator
->
info
;
SIntervalAggOperatorInfo
*
pIaInfo
=
pMiaInfo
->
intervalAggOperatorInfo
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
int32_t
scanFlag
=
MAIN_SCAN
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SSDataBlock
*
pRes
=
pIaInfo
->
binfo
.
pRes
;
SResultRowInfo
*
pResultRowInfo
=
&
pIaInfo
->
binfo
.
resultRowInfo
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
int32_t
scanFlag
=
MAIN_SCAN
;
while
(
1
)
{
SSDataBlock
*
pBlock
=
NULL
;
if
(
m
iaInfo
->
prefetchedBlock
==
NULL
)
{
if
(
pM
iaInfo
->
prefetchedBlock
==
NULL
)
{
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
}
else
{
pBlock
=
m
iaInfo
->
prefetchedBlock
;
m
iaInfo
->
prefetchedBlock
=
NULL
;
pBlock
=
pM
iaInfo
->
prefetchedBlock
;
pM
iaInfo
->
prefetchedBlock
=
NULL
;
m
iaInfo
->
groupId
=
pBlock
->
info
.
groupId
;
pM
iaInfo
->
groupId
=
pBlock
->
info
.
groupId
;
}
// no data exists, all query processing is done
if
(
pBlock
==
NULL
)
{
// close last un
finaliz
ed time window
if
(
m
iaInfo
->
curTs
!=
INT64_MIN
)
{
ASSERT
(
tSimpleHashGetSize
(
iaInfo
->
aggSup
.
pResultRowHashTable
)
==
1
);
outputMergeAlignedIntervalResult
(
pOperator
,
miaInfo
->
groupId
,
pRes
,
miaInfo
->
curTs
);
miaInfo
->
curTs
=
INT64_MIN
;
// close last un
clos
ed time window
if
(
pM
iaInfo
->
curTs
!=
INT64_MIN
)
{
finalizeResultRows
(
pIaInfo
->
aggSup
.
pResultBuf
,
&
pResultRowInfo
->
cur
,
pSup
,
pRes
,
pTaskInfo
);
resetResultRow
(
pMiaInfo
->
pResultRow
,
pIaInfo
->
aggSup
.
resultRowSize
-
sizeof
(
SResultRow
)
);
cleanupAfterGroupResultGen
(
pMiaInfo
,
pRes
)
;
}
doSetOperatorCompleted
(
pOperator
);
break
;
}
if
(
!
miaInfo
->
hasGroupId
)
{
miaInfo
->
hasGroupId
=
true
;
miaInfo
->
groupId
=
pBlock
->
info
.
groupId
;
}
else
if
(
miaInfo
->
groupId
!=
pBlock
->
info
.
groupId
)
{
// if there are unclosed time window, close it firstly.
ASSERT
(
miaInfo
->
curTs
!=
INT64_MIN
);
outputMergeAlignedIntervalResult
(
pOperator
,
miaInfo
->
groupId
,
pRes
,
miaInfo
->
curTs
);
miaInfo
->
prefetchedBlock
=
pBlock
;
miaInfo
->
curTs
=
INT64_MIN
;
break
;
if
(
pMiaInfo
->
groupId
==
0
)
{
if
(
pMiaInfo
->
groupId
!=
pBlock
->
info
.
groupId
)
{
pMiaInfo
->
groupId
=
pBlock
->
info
.
groupId
;
}
}
else
{
if
(
pMiaInfo
->
groupId
!=
pBlock
->
info
.
groupId
)
{
// if there are unclosed time window, close it firstly.
ASSERT
(
pMiaInfo
->
curTs
!=
INT64_MIN
);
finalizeResultRows
(
pIaInfo
->
aggSup
.
pResultBuf
,
&
pResultRowInfo
->
cur
,
pSup
,
pRes
,
pTaskInfo
);
resetResultRow
(
pMiaInfo
->
pResultRow
,
pIaInfo
->
aggSup
.
resultRowSize
-
sizeof
(
SResultRow
));
pMiaInfo
->
prefetchedBlock
=
pBlock
;
cleanupAfterGroupResultGen
(
pMiaInfo
,
pRes
);
break
;
}
else
{
// continue
}
}
getTableScanInfo
(
pOperator
,
&
i
aInfo
->
inputOrder
,
&
scanFlag
);
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
i
aInfo
->
inputOrder
,
scanFlag
,
true
);
doMergeAlignedIntervalAggImpl
(
pOperator
,
&
iaInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
pRes
);
getTableScanInfo
(
pOperator
,
&
pI
aInfo
->
inputOrder
,
&
scanFlag
);
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
pI
aInfo
->
inputOrder
,
scanFlag
,
true
);
doMergeAlignedIntervalAggImpl
(
pOperator
,
&
pIaInfo
->
binfo
.
resultRowInfo
,
pBlock
,
pRes
);
doFilter
(
m
iaInfo
->
pCondition
,
pRes
,
NULL
);
doFilter
(
pM
iaInfo
->
pCondition
,
pRes
,
NULL
);
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
capacity
)
{
break
;
}
}
pRes
->
info
.
groupId
=
miaInfo
->
groupId
;
miaInfo
->
hasGroupId
=
false
;
}
static
SSDataBlock
*
mergeAlignedIntervalAgg
(
SOperatorInfo
*
pOperator
)
{
...
...
@@ -5155,7 +5144,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator
->
info
=
miaInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
mergeAlignedIntervalAgg
,
NULL
,
NULL
,
destroyM
ergeAlignedInterval
OperatorInfo
,
NULL
,
NULL
,
NULL
);
destroyM
AI
OperatorInfo
,
NULL
,
NULL
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -5165,7 +5154,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
return
pOperator
;
_error:
destroyM
ergeAlignedInterval
OperatorInfo
(
miaInfo
);
destroyM
AI
OperatorInfo
(
miaInfo
);
taosMemoryFreeClear
(
pOperator
);
pTaskInfo
->
code
=
code
;
return
NULL
;
...
...
@@ -5208,8 +5197,7 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
tSimpleHashGet
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
ASSERT
(
p1
!=
NULL
);
finalizeResultRowIntoResultDataBlock
(
iaInfo
->
aggSup
.
pResultBuf
,
p1
,
pExprSup
->
pCtx
,
pExprSup
->
pExprInfo
,
pExprSup
->
numOfExprs
,
pExprSup
->
rowEntryInfoOffset
,
pResultBlock
,
pTaskInfo
);
// finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo);
tSimpleHashRemove
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -5218,9 +5206,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
STimeWindow
*
newWin
)
{
SMergeIntervalAggOperatorInfo
*
miaInfo
=
pOperatorInfo
->
info
;
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
bool
ascScan
=
(
iaInfo
->
inputOrder
==
TSDB_ORDER_ASC
);
SExprSupp
*
pExprSup
=
&
pOperatorInfo
->
exprSupp
;
SGroupTimeWindow
groupTimeWindow
=
{.
groupId
=
tableGroupId
,
.
window
=
*
newWin
};
tdListAppend
(
miaInfo
->
groupIntervals
,
&
groupTimeWindow
);
...
...
@@ -5233,9 +5219,10 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
if
(
prevGrpWin
->
groupId
!=
tableGroupId
)
{
continue
;
}
STimeWindow
*
prevWin
=
&
prevGrpWin
->
window
;
if
((
ascScan
&&
newWin
->
skey
>
prevWin
->
ekey
)
||
((
!
ascScan
)
&&
newWin
->
skey
<
prevWin
->
ekey
))
{
finalizeWindowResult
(
pOperatorInfo
,
tableGroupId
,
prevWin
,
pResultBlock
);
//
finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
tdListPopNode
(
miaInfo
->
groupIntervals
,
listNode
);
}
}
...
...
@@ -5395,7 +5382,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
if
(
listNode
!=
NULL
)
{
SGroupTimeWindow
*
grpWin
=
(
SGroupTimeWindow
*
)(
listNode
->
data
);
finalizeWindowResult
(
pOperator
,
grpWin
->
groupId
,
&
grpWin
->
window
,
pRes
);
//
finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes);
pRes
->
info
.
groupId
=
grpWin
->
groupId
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录