未验证 提交 00d64317 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #16939 from taosdata/feature/TD-18820

feat(steam):optimize window close
...@@ -1064,6 +1064,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx); ...@@ -1064,6 +1064,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SOperatorInfo* pOperator, STimeWindowAggSupp* pTwSup);
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp); void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp);
void printDataBlock(SSDataBlock* pBlock, const char* flag); void printDataBlock(SSDataBlock* pBlock, const char* flag);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
......
...@@ -1331,7 +1331,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock ...@@ -1331,7 +1331,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
// must check update info first. // must check update info first.
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
bool closedWin = isClosed && isSignleIntervalWindow(pInfo) && bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
isDeletedWindow(&win, pBlock->info.groupId, pInfo->windowSup.pIntervalAggSup); isDeletedStreamWindow(&win, pBlock->info.groupId, pInfo->pTableScanOp, &pInfo->twAggSup);
if ((update || closedWin) && out) { if ((update || closedWin) && out) {
qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin); qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
uint64_t gpId = closedWin && pInfo->partitionSup.needCalc uint64_t gpId = closedWin && pInfo->partitionSup.needCalc
...@@ -1931,11 +1931,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -1931,11 +1931,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pTagCond = pTagCond; pInfo->pTagCond = pTagCond;
pInfo->pGroupTags = pTableScanNode->pGroupTags; pInfo->pGroupTags = pTableScanNode->pGroupTags;
pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pTableScanNode->watermark,
.calTrigger = pTableScanNode->triggerType,
.maxTs = INT64_MIN,
};
int32_t numOfCols = 0; int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
...@@ -1985,7 +1980,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -1985,7 +1980,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pUpdateInfo = NULL; pInfo->pUpdateInfo = NULL;
pInfo->pTableScanOp = pTableScanOp; pInfo->pTableScanOp = pTableScanOp;
pInfo->interval = pTSInfo->pdInfo.interval;
pInfo->readHandle = *pHandle; pInfo->readHandle = *pHandle;
pInfo->tableUid = pScanPhyNode->uid; pInfo->tableUid = pScanPhyNode->uid;
......
...@@ -1753,16 +1753,17 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt ...@@ -1753,16 +1753,17 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
} }
void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup, SInterval* pInterval, void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup, SInterval* pInterval,
int64_t waterMark) { STimeWindowAggSupp* pTwSup) {
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
initIntervalDownStream(downstream->pDownstream[0], type, pSup, pInterval, waterMark); initIntervalDownStream(downstream->pDownstream[0], type, pSup, pInterval, pTwSup);
return; return;
} }
SStreamScanInfo* pScanInfo = downstream->info; SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->windowSup.parentType = type; pScanInfo->windowSup.parentType = type;
pScanInfo->windowSup.pIntervalAggSup = pSup; pScanInfo->windowSup.pIntervalAggSup = pSup;
pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, waterMark); pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, pTwSup->waterMark);
pScanInfo->interval = *pInterval; pScanInfo->interval = *pInterval;
pScanInfo->twAggSup = *pTwSup;
} }
void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) { void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
...@@ -1847,11 +1848,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1847,11 +1848,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL, pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL,
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL) {
initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, &pInfo->aggSup, &pInfo->interval,
pInfo->twAggSup.waterMark);
}
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
...@@ -2868,6 +2864,19 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) { ...@@ -2868,6 +2864,19 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
return p1 == NULL; return p1 == NULL;
} }
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SOperatorInfo* pOperator, STimeWindowAggSupp* pTwSup) {
if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
SWinKey key = {.ts = pWin->skey, .groupId = groupId};
void* pVal = NULL;
int32_t size = 0;
if (streamStateGet(pOperator->pTaskInfo->streamInfo.pState, &key, &pVal, &size) < 0) {
return false;
}
streamStateReleaseBuf(pOperator->pTaskInfo->streamInfo.pState, &key, pVal);
}
return false;
}
int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* tsCols, int32_t startPos, TSKEY eKey, int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* tsCols, int32_t startPos, TSKEY eKey,
STimeWindow* pNextWin) { STimeWindow* pNextWin) {
int32_t forwardRows = int32_t forwardRows =
...@@ -3425,7 +3434,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3425,7 +3434,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL); aggEncodeResultRow, aggDecodeResultRow, NULL);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, pInfo->twAggSup.waterMark); initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
} }
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -5944,7 +5953,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5944,7 +5953,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, destroyStreamIntervalOperatorInfo, createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, destroyStreamIntervalOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL); aggEncodeResultRow, aggDecodeResultRow, NULL);
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, pInfo->twAggSup.waterMark); initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册