Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b3ce29dc
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b3ce29dc
编写于
6月 10, 2022
作者:
S
shenglian zhou
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add time window operator
上级
62780bbf
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
90 addition
and
75 deletion
+90
-75
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+3
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+20
-4
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+67
-71
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
b3ce29dc
...
@@ -797,6 +797,9 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
...
@@ -797,6 +797,9 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
SOperatorInfo
*
createIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SOperatorInfo
*
createIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
);
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMergeIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamFinalIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SOperatorInfo
*
createStreamFinalIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
numOfChild
);
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
numOfChild
);
SOperatorInfo
*
createStreamIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SOperatorInfo
*
createStreamIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
b3ce29dc
...
@@ -1956,8 +1956,9 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_
...
@@ -1956,8 +1956,9 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_
}
}
int32_t
finalizeResultRowIntoResultDataBlock
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
int32_t
finalizeResultRowIntoResultDataBlock
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
SqlFunctionCtx
*
pCtx
,
SExprInfo
*
pExprInfo
,
int32_t
numOfExprs
,
const
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
SExprInfo
*
pExprInfo
,
int32_t
numOfExprs
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
)
{
const
int32_t
*
rowCellOffset
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
)
{
SFilePage
*
page
=
getBufPage
(
pBuf
,
resultRowPosition
->
pageId
);
SFilePage
*
page
=
getBufPage
(
pBuf
,
resultRowPosition
->
pageId
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
page
+
resultRowPosition
->
offset
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
page
+
resultRowPosition
->
offset
);
...
@@ -4665,6 +4666,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
...
@@ -4665,6 +4666,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t
tsSlotId
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
int32_t
tsSlotId
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
pOptr
=
createIntervalOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
interval
,
tsSlotId
,
&
as
,
pTaskInfo
);
pOptr
=
createIntervalOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
interval
,
tsSlotId
,
&
as
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
==
type
)
{
SMergeIntervalPhysiNode
*
pIntervalPhyNode
=
(
SMergeIntervalPhysiNode
*
)
pPhyNode
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pIntervalPhyNode
->
window
.
pFuncs
,
NULL
,
&
num
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
SInterval
interval
=
{.
interval
=
pIntervalPhyNode
->
interval
,
.
sliding
=
pIntervalPhyNode
->
sliding
,
.
intervalUnit
=
pIntervalPhyNode
->
intervalUnit
,
.
slidingUnit
=
pIntervalPhyNode
->
slidingUnit
,
.
offset
=
pIntervalPhyNode
->
offset
,
.
precision
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
node
.
resType
.
precision
};
int32_t
tsSlotId
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
pOptr
=
createMergeIntervalOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
interval
,
tsSlotId
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
==
type
)
{
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
==
type
)
{
int32_t
children
=
8
;
int32_t
children
=
8
;
pOptr
=
createStreamFinalIntervalOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
,
children
);
pOptr
=
createStreamFinalIntervalOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
,
children
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
b3ce29dc
...
@@ -1870,8 +1870,8 @@ _error:
...
@@ -1870,8 +1870,8 @@ _error:
return NULL;
return NULL;
}
}
static
void
doHashInterval
(
SOperatorInfo
*
pOperatorInfo
,
SSDataBlock
*
pSDataBlock
,
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock,
int32_t tableGroupId,
int32_t
tableGroupId
,
SArray
*
pUpdated
)
{
SArray* pUpdated) {
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
...
@@ -1886,7 +1886,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
...
@@ -1886,7 +1886,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
tsCols = (int64_t*)pColDataInfo->pData;
} else {
} else {
return
;
return;
}
}
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
...
@@ -1903,13 +1903,14 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
...
@@ -1903,13 +1903,14 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
pos->groupId = tableGroupId;
pos->groupId = tableGroupId;
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
*(int64_t*)pos->key = pResult->win.skey;
*(int64_t*)pos->key = pResult->win.skey;
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pSDataBlock
->
info
,
tsCols
,
startPos
,
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos,
nextWin.ekey, binarySearchForKey, NULL,
nextWin
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
TSDB_ORDER_ASC);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
saveResult(pResult, tableGroupId, pUpdated);
saveResult(pResult, tableGroupId, pUpdated);
}
}
// window start(end) key interpolation
// window start(end) key interpolation
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardRows);
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
// forwardRows);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
...
@@ -2040,10 +2041,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -2040,10 +2041,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
int32_t childIndex = getChildIndex(pBlock);
int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SIntervalAggOperatorInfo* pChildInfo = pChildOp->info;
SIntervalAggOperatorInfo* pChildInfo = pChildOp->info;
doClearWindows
(
&
pChildInfo
->
aggSup
,
&
pChildInfo
->
binfo
,
&
pChildInfo
->
interval
,
doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval,
pChildInfo->primaryTsIndex,
pChildInfo
->
primaryTsIndex
,
pChildOp
->
numOfExprs
,
pBlock
,
NULL
);
pChildOp->numOfExprs, pBlock, NULL);
rebuildIntervalWindow
(
pInfo
,
pUpWins
,
pInfo
->
binfo
.
pRes
->
info
.
groupId
,
rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId,
pOperator->numOfExprs,
pOperator
->
numOfExprs
,
pOperator
->
pTaskInfo
);
pOperator->pTaskInfo);
taosArrayDestroy(pUpWins);
taosArrayDestroy(pUpWins);
continue;
continue;
}
}
...
@@ -2073,10 +2074,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -2073,10 +2074,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
}
if (isFinalInterval(pInfo)) {
if (isFinalInterval(pInfo)) {
closeIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed);
&
pInfo
->
interval
,
pClosed
);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset);
finalizeUpdatedResult
(
pOperator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
pClosed
,
pInfo
->
binfo
.
rowCellInfoOffset
);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
taosArrayAddAll(pUpdated, pClosed);
taosArrayAddAll(pUpdated, pClosed);
}
}
...
@@ -2100,8 +2099,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -2100,8 +2099,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->binfo.pRes;
return pInfo->binfo.pRes;
}
}
SOperatorInfo
*
createStreamFinalIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
numOfChild
)
{
SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo));
SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
...
@@ -2109,25 +2108,26 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
...
@@ -2109,25 +2108,26 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
goto _error;
goto _error;
}
}
pInfo->order = TSDB_ORDER_ASC;
pInfo->order = TSDB_ORDER_ASC;
pInfo
->
interval
=
(
SInterval
)
{.
interval
=
pIntervalPhyNode
->
interval
,
pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,
.sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit,
.intervalUnit = pIntervalPhyNode->intervalUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit,
.offset = pIntervalPhyNode->offset,
.offset = pIntervalPhyNode->offset,
.
precision
=
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
node
.
resType
.
precision
};
pInfo->twAggSup = (STimeWindowAggSupp){
pInfo
->
twAggSup
=
(
STimeWindowAggSupp
){
.
waterMark
=
pIntervalPhyNode
->
window
.
watermark
,
.waterMark = pIntervalPhyNode->window.watermark,
.calTrigger = pIntervalPhyNode->window.triggerType,
.calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN,
.maxTs = INT64_MIN,
.
winMap
=
NULL
,
};
.winMap = NULL,
};
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096);
initResultSizeInfo(pOperator, 4096);
int32_t numOfCols = 0;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t
code
=
initAggInfo
(
&
pInfo
->
binfo
,
&
pInfo
->
aggSup
,
pExprInfo
,
numOfCols
,
int32_t
code =
pResBlock
,
keyBufSize
,
pTaskInfo
->
id
.
str
);
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols,
pResBlock, keyBufSize, pTaskInfo->id.str);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS) {
goto _error;
goto _error;
...
@@ -2149,7 +2149,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
...
@@ -2149,7 +2149,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
if (!isFinalInterval(pInfo)) {
if (!isFinalInterval(pInfo)) {
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
}
}
pInfo
->
pUpdateRes
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
\
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
pInfo->pUpdateRes->info.type = STREAM_REPROCESS;
pInfo->pUpdateRes->info.type = STREAM_REPROCESS;
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pInfo->pPhyNode = nodesCloneNode(pPhyNode);
pInfo->pPhyNode = nodesCloneNode(pPhyNode);
...
@@ -2163,9 +2163,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
...
@@ -2163,9 +2163,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->numOfExprs = numOfCols;
pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->info = pInfo;
pOperator
->
fpSet
=
createOperatorFpSet
(
NULL
,
doStreamFinalIntervalAgg
,
NULL
,
NULL
,
pOperator->fpSet =
destroyStreamFinalIntervalOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo
,
NULL
);
aggEncodeResultRow, aggDecodeResultRow,
NULL);
code = appendDownstream(pOperator, &downstream, 1);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS) {
...
@@ -2205,8 +2205,7 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
...
@@ -2205,8 +2205,7 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
}
}
}
}
int32_t
initBiasicInfo
(
SOptrBasicInfo
*
pBasicInfo
,
SExprInfo
*
pExprInfo
,
int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock) {
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
)
{
pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset);
pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset);
pBasicInfo->pRes = pResultBlock;
pBasicInfo->pRes = pResultBlock;
for (int32_t i = 0; i < numOfCols; ++i) {
for (int32_t i = 0; i < numOfCols; ++i) {
...
@@ -3182,7 +3181,7 @@ typedef struct SMergeIntervalAggOperatorInfo {
...
@@ -3182,7 +3181,7 @@ typedef struct SMergeIntervalAggOperatorInfo {
SHashObj* groupIntervalHash;
SHashObj* groupIntervalHash;
bool hasGroupId;
bool hasGroupId;
uint64_t groupId;
uint64_t groupId;
SSDataBlock
*
prefetchedBlock
;
SSDataBlock
*
prefetchedBlock;
} SMergeIntervalAggOperatorInfo;
} SMergeIntervalAggOperatorInfo;
void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
...
@@ -3191,13 +3190,14 @@ void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
...
@@ -3191,13 +3190,14 @@ void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
}
}
static
int32_t
outputPrevIntervalResult
(
SOperatorInfo
*
pOperatorInfo
,
uint64_t
tableGroupId
,
SSDataBlock
*
pResultBlock
,
STimeWindow
*
newWin
)
{
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
SMergeIntervalAggOperatorInfo
*
miaInfo
=
pOperatorInfo
->
info
;
STimeWindow* newWin) {
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
STimeWindow
*
prevWin
=
taosHashGet
(
miaInfo
->
groupIntervalHash
,
&
tableGroupId
,
sizeof
(
tableGroupId
));
STimeWindow
* prevWin
= taosHashGet(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
if (prevWin == NULL) {
if (prevWin == NULL) {
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
return 0;
return 0;
...
@@ -3205,8 +3205,8 @@ static int32_t outputPrevIntervalResult(SOperatorInfo * pOperatorInfo, uint64_t
...
@@ -3205,8 +3205,8 @@ static int32_t outputPrevIntervalResult(SOperatorInfo * pOperatorInfo, uint64_t
if (ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey) {
if (ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey) {
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &prevWin->skey, TSDB_KEYSIZE, tableGroupId);
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &prevWin->skey, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition
*
p1
=
SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
(
SResultRowPosition
*
)
taosHashGet
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr,
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr,
...
@@ -3221,8 +3221,8 @@ static int32_t outputPrevIntervalResult(SOperatorInfo * pOperatorInfo, uint64_t
...
@@ -3221,8 +3221,8 @@ static int32_t outputPrevIntervalResult(SOperatorInfo * pOperatorInfo, uint64_t
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
int32_t scanFlag, SSDataBlock* pResultBlock) {
int32_t scanFlag, SSDataBlock* pResultBlock) {
SMergeIntervalAggOperatorInfo
*
miaInfo
=
pOperatorInfo
->
info
;
SMergeIntervalAggOperatorInfo
*
miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
...
@@ -3255,10 +3255,9 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
...
@@ -3255,10 +3255,9 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
// restore current time window
// restore current time window
ret
=
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup,
iaInfo
->
binfo
.
pCtx
,
pTaskInfo);
numOfOutput
,
iaInfo
->
binfo
.
rowCellInfoOffset
,
&
iaInfo
->
aggSup
,
pTaskInfo
);
if (ret != TSDB_CODE_SUCCESS) {
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
...
@@ -3314,11 +3313,10 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
...
@@ -3314,11 +3313,10 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
}
}
static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
SIntervalAggOperatorInfo
*
iaInfo = &miaInfo->intervalAggOperatorInfo;
if (pOperator->status == OP_EXEC_DONE) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
return NULL;
}
}
...
@@ -3329,7 +3327,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -3329,7 +3327,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
SOperatorInfo* downstream = pOperator->pDownstream[0];
int32_t scanFlag = MAIN_SCAN;
int32_t scanFlag = MAIN_SCAN;
while (1) {
while (1) {
SSDataBlock
*
pBlock
=
NULL
;
SSDataBlock
*
pBlock = NULL;
if (miaInfo->prefetchedBlock == NULL) {
if (miaInfo->prefetchedBlock == NULL) {
pBlock = downstream->fpSet.getNextFn(downstream);
pBlock = downstream->fpSet.getNextFn(downstream);
} else {
} else {
...
@@ -3341,7 +3339,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -3341,7 +3339,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
break;
break;
}
}
if (!miaInfo->hasGroupId) {
if (!miaInfo->hasGroupId) {
miaInfo->hasGroupId = true;
miaInfo->hasGroupId = true;
miaInfo->groupId = pBlock->info.groupId;
miaInfo->groupId = pBlock->info.groupId;
...
@@ -3376,21 +3373,20 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -3376,21 +3373,20 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
)
{
SExecTaskInfo* pTaskInfo) {
SMergeIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
SMergeIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (miaInfo == NULL || pOperator == NULL) {
if (miaInfo == NULL || pOperator == NULL) {
goto _error;
goto _error;
}
}
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
iaInfo->win = pTaskInfo->window;
iaInfo->win = pTaskInfo->window;
iaInfo->order = TSDB_ORDER_ASC;
iaInfo->order = TSDB_ORDER_ASC;
iaInfo->interval = *pInterval;
iaInfo->interval = *pInterval;
iaInfo->execModel = pTaskInfo->execModel;
iaInfo->execModel = pTaskInfo->execModel;
iaInfo
->
twAggSup
=
*
pTwAggSupp
;
iaInfo->primaryTsIndex = primaryTsSlotId;
iaInfo->primaryTsIndex = primaryTsSlotId;
miaInfo->groupIntervalHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
miaInfo->groupIntervalHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录