提交 0941a464 编写于 作者: S shenglian zhou

fix: add merge interval operator for select mode(expr) + interval

上级 84ea04ea
......@@ -226,6 +226,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_MERGE,
QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL,
......
......@@ -375,6 +375,7 @@ typedef struct SIntervalPhysiNode {
int8_t slidingUnit;
} SIntervalPhysiNode;
typedef SIntervalPhysiNode SMergeIntervalPhysiNode;
typedef SIntervalPhysiNode SMergeAlignedIntervalPhysiNode;
typedef SIntervalPhysiNode SStreamIntervalPhysiNode;
typedef SIntervalPhysiNode SStreamFinalIntervalPhysiNode;
......
......@@ -729,6 +729,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream);
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
SExecTaskInfo* pTaskInfo);
......
......@@ -2765,7 +2765,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
goto _error;
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
code = doInitAggInfoSup(&pInfo->aggSup, pOperator->exprSupp.pCtx, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
......@@ -2787,9 +2787,9 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
pOperator->name = "SortedMerge";
// pOperator->operatorType = OP_SortedMerge;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSortedMerge, NULL, NULL, destroySortedMergeOperatorInfo,
......@@ -2844,8 +2844,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SAggOperatorInfo* pAggInfo = pOperator->info;
SExprSupp* pSup = &pOperator->exprSupp;
SOperatorInfo* downstream = pOperator->pDownstream[0];
SExprSupp* pSup = &pOperator->exprSupp;
SOperatorInfo* downstream = pOperator->pDownstream[0];
int64_t st = taosGetTimestampUs();
......@@ -3736,9 +3736,9 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pFuncs, NULL, &numOfExpr);
if (pPhyNode->pExprs != NULL) {
int32_t num = 0;
int32_t num = 0;
SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num);
int32_t code = initExprSupp(&pInfo->scalarSup, pSExpr, num);
int32_t code = initExprSupp(&pInfo->scalarSup, pSExpr, num);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -3761,14 +3761,14 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
pInfo->binfo.pRes = pResBlock;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pInfo->binfo.pRes = pResBlock;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pOperator->name = "IndefinitOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = numOfExpr;
pOperator->pTaskInfo = pTaskInfo;
......@@ -3838,10 +3838,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = num;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = num;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
......@@ -4087,7 +4087,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*) pPhyNode;
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
......@@ -4205,6 +4205,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit,
.offset = pIntervalPhyNode->offset,
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
int32_t children = 0;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
......
......@@ -1262,7 +1262,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
return TSDB_CODE_SUCCESS;
}
bool isCloseWindow(STimeWindow *pWin, STimeWindowAggSupp* pSup) {
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) {
ASSERT(pSup->maxTs == INT64_MIN || pSup->maxTs > 0);
return pSup->maxTs != INT64_MIN && pWin->ekey < pSup->maxTs - pSup->waterMark;
}
......@@ -2402,7 +2402,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->pChildren = NULL;
if (numOfChild > 0) {
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void *));
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
for (int32_t i = 0; i < numOfChild; i++) {
SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
if (pChildOp) {
......@@ -3681,11 +3681,11 @@ void destroyMergeAlignedIntervalOperatorInfo(void* param, int32_t numOfOutput) {
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
}
static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
TSKEY wstartTs) {
static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId,
SSDataBlock* pResultBlock, TSKEY wstartTs) {
SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp;
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
......@@ -3702,10 +3702,10 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui
return 0;
}
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
int32_t scanFlag, SSDataBlock* pResultBlock) {
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
SSDataBlock* pBlock, int32_t scanFlag, SSDataBlock* pResultBlock) {
SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp;
......@@ -3748,10 +3748,9 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
currTs = tsCols[currPos];
currWin.skey = currTs;
currWin.ekey = taosTimeAdd(currWin.skey,
iaInfo->interval.interval,
iaInfo->interval.intervalUnit,
iaInfo->interval.precision) - 1;
currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit,
iaInfo->interval.precision) -
1;
startPos = currPos;
ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
......@@ -3771,7 +3770,7 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperator->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -3828,11 +3827,11 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
return (rows == 0) ? NULL : pRes;
}
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo,
int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
int32_t primaryTsSlotId, SExecTaskInfo* pTaskInfo) {
SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (miaInfo == NULL || pOperator == NULL) {
goto _error;
}
......@@ -3892,3 +3891,280 @@ _error:
pTaskInfo->code = code;
return NULL;
}
//=====================================================================================================================
// merge interval operator
typedef struct SMergeIntervalAggOperatorInfo {
SIntervalAggOperatorInfo intervalAggOperatorInfo;
SHashObj* groupIntervalHash;
bool hasGroupId;
uint64_t groupId;
SSDataBlock* prefetchedBlock;
bool inputBlocksFinished;
} SMergeIntervalAggOperatorInfo;
void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
taosHashCleanup(miaInfo->groupIntervalHash);
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
}
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
STimeWindow* newWin) {
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
STimeWindow* prevWin = taosHashGet(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
if (prevWin == NULL) {
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
return 0;
}
if (newWin == NULL || (ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey)) {
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &prevWin->skey, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pExprSup->pCtx, pExprSup->pExprInfo,
pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
if (newWin == NULL) {
taosHashRemove(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
} else {
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
}
}
return 0;
}
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
int32_t scanFlag, SSDataBlock* pResultBlock) {
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
int32_t startPos = 0;
int32_t numOfOutput = pExprSup->numOfExprs;
int64_t* tsCols = extractTsCol(pBlock, iaInfo);
uint64_t tableGroupId = pBlock->info.groupId;
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
SResultRow* pResult = NULL;
STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
iaInfo->interval.precision, &iaInfo->win);
int32_t ret =
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
TSKEY ekey = ascScan ? win.ekey : win.skey;
int32_t forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order);
ASSERT(forwardRows > 0);
// prev time window not interpolation yet.
if (iaInfo->timeWindowInterpo) {
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult);
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
// restore current time window
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
// window start key interpolation
doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, pExprSup->pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
pBlock->info.rows, numOfOutput, iaInfo->order);
doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&win) is closed
outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
STimeWindow nextWin = win;
while (1) {
int32_t prevEndPos = forwardRows - 1 + startPos;
startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->order);
if (startPos < 0) {
break;
}
// null data, failed to allocate more memory buffer
int32_t code =
setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order);
// window start(end) key interpolation
doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pExprSup->pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
tsCols, pBlock->info.rows, numOfOutput, iaInfo->order);
doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&nextWin) is closed
outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
}
if (iaInfo->timeWindowInterpo) {
saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
}
}
static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExprSupp* pExpSupp = &pOperator->exprSupp;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SSDataBlock* pRes = iaInfo->binfo.pRes;
blockDataCleanup(pRes);
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
if (!miaInfo->inputBlocksFinished) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
int32_t scanFlag = MAIN_SCAN;
while (1) {
SSDataBlock* pBlock = NULL;
if (miaInfo->prefetchedBlock == NULL) {
pBlock = downstream->fpSet.getNextFn(downstream);
} else {
pBlock = miaInfo->prefetchedBlock;
miaInfo->groupId = pBlock->info.groupId;
miaInfo->prefetchedBlock = NULL;
}
if (pBlock == NULL) {
miaInfo->inputBlocksFinished = true;
break;
}
if (!miaInfo->hasGroupId) {
miaInfo->hasGroupId = true;
miaInfo->groupId = pBlock->info.groupId;
} else if (miaInfo->groupId != pBlock->info.groupId) {
miaInfo->prefetchedBlock = pBlock;
break;
}
getTableScanInfo(pOperator, &iaInfo->order, &scanFlag);
setInputDataBlock(pOperator, pExpSupp->pCtx, pBlock, iaInfo->order, scanFlag, true);
doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break;
}
}
pRes->info.groupId = miaInfo->groupId;
} else {
void* p = taosHashIterate(miaInfo->groupIntervalHash, NULL);
if (p != NULL) {
size_t len = 0;
uint64_t* pKey = taosHashGetKey(p, &len);
outputPrevIntervalResult(pOperator, *pKey, pRes, NULL);
}
}
if (pRes->info.rows == 0) {
doSetOperatorCompleted(pOperator);
}
size_t rows = pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
return (rows == 0) ? NULL : pRes;
}
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
SExecTaskInfo* pTaskInfo) {
SMergeIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (miaInfo == NULL || pOperator == NULL) {
goto _error;
}
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
iaInfo->win = pTaskInfo->window;
iaInfo->order = TSDB_ORDER_ASC;
iaInfo->interval = *pInterval;
iaInfo->execModel = pTaskInfo->execModel;
iaInfo->primaryTsIndex = primaryTsSlotId;
SExprSupp* pExprSupp = &pOperator->exprSupp;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096);
int32_t code = initAggInfo(pExprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&iaInfo->binfo, pResBlock);
initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, numOfCols, iaInfo);
if (iaInfo->timeWindowInterpo) {
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
if (iaInfo->binfo.resultRowInfo.openWindow == NULL) {
goto _error;
}
}
initResultRowInfo(&iaInfo->binfo.resultRowInfo);
pOperator->name = "TimeMergeIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = miaInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL,
destroyMergeIntervalOperatorInfo, NULL, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
destroyMergeIntervalOperatorInfo(miaInfo, numOfCols);
taosMemoryFreeClear(miaInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册