未验证 提交 1de6903f 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #15722 from taosdata/feature/stream

enh(stream): stream query is not limited by window count
...@@ -301,7 +301,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -301,7 +301,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
// too many time window in query // too many time window in query
if (taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) { if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
} }
...@@ -3598,7 +3599,8 @@ void doDestroyExchangeOperatorInfo(void* param) { ...@@ -3598,7 +3599,8 @@ void doDestroyExchangeOperatorInfo(void* param) {
} }
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode, static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode,
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType, int32_t order) { STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType,
int32_t order) {
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode); SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode);
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey); STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey);
...@@ -3635,7 +3637,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -3635,7 +3637,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
: &((SIntervalAggOperatorInfo*)downstream->info)->interval; : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
int32_t type = convertFillType(pPhyFillNode->mode); int32_t type = convertFillType(pPhyFillNode->mode);
SResultInfo* pResultInfo = &pOperator->resultInfo; SResultInfo* pResultInfo = &pOperator->resultInfo;
...@@ -3833,7 +3835,7 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) ...@@ -3833,7 +3835,7 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum)
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
bool groupbyTbname(SNodeList* pGroupList) { bool groupbyTbname(SNodeList* pGroupList) {
bool bytbname = false; bool bytbname = false;
if (LIST_LENGTH(pGroupList) > 0) { if (LIST_LENGTH(pGroupList) > 0) {
SNode* p = nodesListGetNode(pGroupList, 0); SNode* p = nodesListGetNode(pGroupList, 0);
...@@ -3875,7 +3877,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, ...@@ -3875,7 +3877,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
bool assignUid = groupbyTbname(group); bool assignUid = groupbyTbname(group);
int32_t groupNum = 0; int32_t groupNum = 0;
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
for (int32_t i = 0; i < numOfTables; i++) { for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
...@@ -4608,7 +4610,7 @@ void releaseQueryBuf(size_t numOfTables) { ...@@ -4608,7 +4610,7 @@ void releaseQueryBuf(size_t numOfTables) {
} }
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) { int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
SExplainExecInfo execInfo = {0}; SExplainExecInfo execInfo = {0};
SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo); SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows; pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
...@@ -4618,7 +4620,8 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf ...@@ -4618,7 +4620,8 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
pExplainInfo->verboseInfo = NULL; pExplainInfo->verboseInfo = NULL;
if (operatorInfo->fpSet.getExplainFn) { if (operatorInfo->fpSet.getExplainFn) {
int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen); int32_t code =
operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
if (code) { if (code) {
qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code)); qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
return code; return code;
...@@ -4629,7 +4632,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf ...@@ -4629,7 +4632,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) { for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList); code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
// taosMemoryFreeClear(*pRes); // taosMemoryFreeClear(*pRes);
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
} }
......
...@@ -56,6 +56,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -56,6 +56,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
goto _error; goto _error;
} }
pOperator->pTaskInfo = pTaskInfo;
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols);
...@@ -63,7 +65,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -63,7 +65,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo); initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pFinalRes = createOneDataBlock(pResBlock, false); pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
pInfo->pFilterNode = pProjPhyNode->node.pConditions; pInfo->pFilterNode = pProjPhyNode->node.pConditions;
pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock; pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
...@@ -73,7 +75,6 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -73,7 +75,6 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pInfo->mergeDataBlocks = false; pInfo->mergeDataBlocks = false;
} }
int32_t numOfRows = 4096; int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
...@@ -89,12 +90,11 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -89,12 +90,11 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols); setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
pOperator->name = "ProjectOperator"; pOperator->name = "ProjectOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
destroyProjectOperatorInfo, NULL, NULL, NULL); destroyProjectOperatorInfo, NULL, NULL, NULL);
...@@ -106,7 +106,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -106,7 +106,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
return pOperator; return pOperator;
_error: _error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
...@@ -156,7 +156,8 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S ...@@ -156,7 +156,8 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
return PROJECT_RETRIEVE_DONE; return PROJECT_RETRIEVE_DONE;
} }
static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock, SOperatorInfo* pOperator) { static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock,
SOperatorInfo* pOperator) {
// set current group id // set current group id
pLimitInfo->currentGroupId = groupId; pLimitInfo->currentGroupId = groupId;
...@@ -170,8 +171,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS ...@@ -170,8 +171,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
} }
// check for the limitation in each group // check for the limitation in each group
if (pLimitInfo->limit.limit >= 0 && if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows); int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
blockDataKeepFirstNRows(pBlock, keepRows); blockDataKeepFirstNRows(pBlock, keepRows);
if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) { if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
...@@ -222,7 +222,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -222,7 +222,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
} }
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
SLimitInfo* pLimitInfo = &pProjectInfo->limitInfo; SLimitInfo* pLimitInfo = &pProjectInfo->limitInfo;
if (downstream == NULL) { if (downstream == NULL) {
return doGenerateSourceData(pOperator); return doGenerateSourceData(pOperator);
...@@ -317,7 +317,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -317,7 +317,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
if (pOperator->cost.openCost == 0) { if (pOperator->cost.openCost == 0) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
} }
// printDataBlock1(p, "project"); // printDataBlock1(p, "project");
return (p->info.rows > 0) ? p : NULL; return (p->info.rows > 0) ? p : NULL;
} }
...@@ -330,6 +330,8 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -330,6 +330,8 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
goto _error; goto _error;
} }
pOperator->pTaskInfo = pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode; SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
...@@ -373,7 +375,6 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -373,7 +375,6 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
destroyIndefinitOperatorInfo, NULL, NULL, NULL); destroyIndefinitOperatorInfo, NULL, NULL, NULL);
...@@ -385,7 +386,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -385,7 +386,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
return pOperator; return pOperator;
_error: _error:
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -593,7 +594,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) { ...@@ -593,7 +594,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
pRes->info.rows = 1; pRes->info.rows = 1;
doFilter(pProjectInfo->pFilterNode, pRes, NULL); doFilter(pProjectInfo->pFilterNode, pRes, NULL);
/*int32_t status = */doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator); /*int32_t status = */ doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator);
pOperator->resultInfo.totalRows += pRes->info.rows; pOperator->resultInfo.totalRows += pRes->info.rows;
......
...@@ -30,6 +30,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* ...@@ -30,6 +30,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
goto _error; goto _error;
} }
pOperator->pTaskInfo = pTaskInfo;
SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
int32_t numOfCols = 0; int32_t numOfCols = 0;
...@@ -45,7 +46,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* ...@@ -45,7 +46,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
pInfo->pCondition = pSortNode->node.pConditions; pInfo->pCondition = pSortNode->node.pConditions;
pInfo->pColMatchInfo = pColMatchColInfo; pInfo->pColMatchInfo = pColMatchColInfo;
initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
...@@ -57,7 +58,6 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* ...@@ -57,7 +58,6 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
// lazy evaluation for the following parameter since the input datablock is not known till now. // lazy evaluation for the following parameter since the input datablock is not known till now.
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2;
...@@ -222,7 +222,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { ...@@ -222,7 +222,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
} }
// todo add the limit/offset info // todo add the limit/offset info
if (pInfo->limitInfo.remainOffset > 0) { if (pInfo->limitInfo.remainOffset > 0) {
if (pInfo->limitInfo.remainOffset >= blockDataGetNumOfRows(pBlock)) { if (pInfo->limitInfo.remainOffset >= blockDataGetNumOfRows(pBlock)) {
pInfo->limitInfo.remainOffset -= pBlock->info.rows; pInfo->limitInfo.remainOffset -= pBlock->info.rows;
continue; continue;
...@@ -247,7 +247,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { ...@@ -247,7 +247,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
} }
} }
return blockDataGetNumOfRows(pBlock) > 0? pBlock:NULL; return blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL;
} }
void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -474,7 +474,7 @@ void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -474,7 +474,7 @@ void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo); taosArrayDestroy(pInfo->pColMatchInfo);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -609,8 +609,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData ...@@ -609,8 +609,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
pInfo->groupId = tsortGetGroupId(pTupleHandle); pInfo->groupId = tsortGetGroupId(pTupleHandle);
pInfo->prefetchedTuple = NULL; pInfo->prefetchedTuple = NULL;
} }
} } else {
else {
pTupleHandle = tsortNextTuple(pHandle); pTupleHandle = tsortNextTuple(pHandle);
pInfo->groupId = 0; pInfo->groupId = 0;
} }
...@@ -694,7 +693,7 @@ void destroyMultiwayMergeOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -694,7 +693,7 @@ void destroyMultiwayMergeOperatorInfo(void* param, int32_t numOfOutput) {
tsortDestroySortHandle(pInfo->pSortHandle); tsortDestroySortHandle(pInfo->pSortHandle);
taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo); taosArrayDestroy(pInfo->pColMatchInfo);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -711,7 +710,7 @@ int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplai ...@@ -711,7 +710,7 @@ int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplai
} }
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams, SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams,
SMergePhysiNode* pMergePhyNode, SExecTaskInfo* pTaskInfo) { SMergePhysiNode* pMergePhyNode, SExecTaskInfo* pTaskInfo) {
SPhysiNode* pPhyNode = (SPhysiNode*)pMergePhyNode; SPhysiNode* pPhyNode = (SPhysiNode*)pMergePhyNode;
SMultiwayMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwayMergeOperatorInfo)); SMultiwayMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwayMergeOperatorInfo));
......
...@@ -928,8 +928,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -928,8 +928,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols); TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder); STimeWindow win =
int32_t ret = TSDB_CODE_SUCCESS; getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder);
int32_t ret = TSDB_CODE_SUCCESS;
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) &&
inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
...@@ -1091,8 +1092,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1091,8 +1092,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->inputOrder, scanFlag, true); setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->inputOrder, scanFlag, true);
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL);
} }
...@@ -1790,9 +1791,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1790,9 +1791,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error; goto _error;
} }
pOperator->pTaskInfo = pTaskInfo;
pInfo->win = pTaskInfo->window; pInfo->win = pTaskInfo->window;
pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
pInfo->interval = *pInterval; pInfo->interval = *pInterval;
pInfo->execModel = pTaskInfo->execModel; pInfo->execModel = pTaskInfo->execModel;
pInfo->twAggSup = *pTwAggSupp; pInfo->twAggSup = *pTwAggSupp;
...@@ -1845,7 +1847,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1845,7 +1847,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -1880,6 +1881,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr ...@@ -1880,6 +1881,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
goto _error; goto _error;
} }
pOperator->pTaskInfo = pTaskInfo;
pInfo->inputOrder = TSDB_ORDER_ASC; pInfo->inputOrder = TSDB_ORDER_ASC;
pInfo->interval = *pInterval; pInfo->interval = *pInterval;
pInfo->execModel = OPTR_EXEC_MODEL_STREAM; pInfo->execModel = OPTR_EXEC_MODEL_STREAM;
...@@ -1906,7 +1908,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr ...@@ -1906,7 +1908,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -2180,7 +2181,6 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp ...@@ -2180,7 +2181,6 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
break; break;
} }
} }
} }
static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
...@@ -2457,7 +2457,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW ...@@ -2457,7 +2457,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc);
...@@ -2475,11 +2475,11 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW ...@@ -2475,11 +2475,11 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->winSup.prevTs = INT64_MIN; pInfo->winSup.prevTs = INT64_MIN;
pInfo->reptScan = false; pInfo->reptScan = false;
pInfo->pCondition = pSessionNode->window.node.pConditions; pInfo->pCondition = pSessionNode->window.node.pConditions;
pOperator->name = "SessionWindowAggOperator"; pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
...@@ -3028,6 +3028,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3028,6 +3028,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
goto _error; goto _error;
} }
pOperator->pTaskInfo = pTaskInfo;
pInfo->order = TSDB_ORDER_ASC; pInfo->order = TSDB_ORDER_ASC;
pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval, pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding, .sliding = pIntervalPhyNode->sliding,
...@@ -3114,7 +3115,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3114,7 +3115,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -3155,7 +3155,7 @@ void destroyStateWinInfo(void* ptr) { ...@@ -3155,7 +3155,7 @@ void destroyStateWinInfo(void* ptr) {
if (ptr == NULL) { if (ptr == NULL) {
return; return;
} }
SStateWindowInfo* pWin = (SStateWindowInfo*) ptr; SStateWindowInfo* pWin = (SStateWindowInfo*)ptr;
taosMemoryFreeClear(pWin->stateKey.pData); taosMemoryFreeClear(pWin->stateKey.pData);
} }
...@@ -3246,6 +3246,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -3246,6 +3246,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
goto _error; goto _error;
} }
pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
if (pSessionNode->window.pExprs != NULL) { if (pSessionNode->window.pExprs != NULL) {
int32_t numOfScalar = 0; int32_t numOfScalar = 0;
...@@ -3308,7 +3310,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -3308,7 +3310,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo, createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL); aggEncodeResultRow, aggDecodeResultRow, NULL);
pOperator->pTaskInfo = pTaskInfo;
if (downstream) { if (downstream) {
initDownStream(downstream, &pInfo->streamAggSup, pInfo->gap, pInfo->twAggSup.waterMark, pOperator->operatorType); initDownStream(downstream, &pInfo->streamAggSup, pInfo->gap, pInfo->twAggSup.waterMark, pOperator->operatorType);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
...@@ -3465,7 +3466,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes ...@@ -3465,7 +3466,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes
assert(pWinInfo->win.skey <= pWinInfo->win.ekey); assert(pWinInfo->win.skey <= pWinInfo->win.ekey);
// too many time window in query // too many time window in query
int32_t size = taosArrayGetSize(pAggSup->pCurWins); int32_t size = taosArrayGetSize(pAggSup->pCurWins);
if (size > MAX_INTERVAL_TIME_WINDOW) { if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && size > MAX_INTERVAL_TIME_WINDOW) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
} }
...@@ -3647,8 +3648,8 @@ void deleteWindow(SArray* pWinInfos, int32_t index, FDelete fp) { ...@@ -3647,8 +3648,8 @@ void deleteWindow(SArray* pWinInfos, int32_t index, FDelete fp) {
taosArrayRemove(pWinInfos, index); taosArrayRemove(pWinInfos, index);
} }
static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int64_t gap, static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int64_t gap, SArray* result,
SArray* result, FDelete fp) { FDelete fp) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startDatas = (TSKEY*)pStartTsCol->pData; TSKEY* startDatas = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
...@@ -4673,7 +4674,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4673,7 +4674,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
currTs = tsCols[currPos]; currTs = tsCols[currPos];
currWin.skey = currTs; currWin.skey = currTs;
currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit,
iaInfo->interval.precision) - 1; iaInfo->interval.precision) -
1;
startPos = currPos; startPos = currPos;
ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
...@@ -4933,8 +4935,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -4933,8 +4935,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols); TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
STimeWindow win = STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, iaInfo->inputOrder); iaInfo->inputOrder);
int32_t ret = int32_t ret =
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx, setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
...@@ -4975,7 +4977,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -4975,7 +4977,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
STimeWindow nextWin = win; STimeWindow nextWin = win;
while (1) { while (1) {
int32_t prevEndPos = forwardRows - 1 + startPos; int32_t prevEndPos = forwardRows - 1 + startPos;
startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->inputOrder); startPos =
getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->inputOrder);
if (startPos < 0) { if (startPos < 0) {
break; break;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册