diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 77915624dd44c45b274a6ea3ec6fd170c34a2bd8..62146b60486de6854e7fa4732e570565dcd5b0b1 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -881,7 +881,9 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul void doDestroyExchangeOperatorInfo(void* param); -void doSetOperatorCompleted(SOperatorInfo* pOperator); +void setOperatorCompleted(SOperatorInfo* pOperator); +void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, void* pInfo, + SExecTaskInfo* pTaskInfo); void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo * pCache); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 41fcc44f97b3baf24715f843e548999139823e9d..6f9084ce52c3ae2caac867c73f23f40b95bff95f 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -93,12 +93,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset); } - pOperator->name = "LastrowScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->fpSet = @@ -126,7 +121,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { uint64_t suid = tableListGetSuid(pTableList); int32_t size = tableListGetSize(pTableList); if (size == 0) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } @@ -182,7 +177,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { pInfo->indexOfBufferedRes += 1; return pRes; } else { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } } else { @@ -234,7 +229,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } } - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index cb3330e9ca7e0b18441b71f834f5f65cb47e08bb..049de727dfda4ed6bc39960ae8e024f8807b3ad1 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -213,7 +213,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { pExchangeInfo->limitInfo.numOfOutputRows += rows; if (rows == 0) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } else { return pBlock; @@ -289,13 +289,8 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode pInfo->seqLoadData = false; pInfo->pTransporter = pTransporter; - pOperator->name = "ExchangeOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock); - pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, destroyExchangeOperatorInfo, NULL); return pOperator; @@ -506,7 +501,7 @@ void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) { GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize, pLoadInfo->totalElapsed / 1000.0); - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ca1d4cccab97860596dfa9178f14cc31ce359952..709e981a1ff6bcb01a0f4765ec97eaf5c428c180 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -94,7 +94,7 @@ static void destroyIntervalOperatorInfo(void* param); static void destroyOperatorInfo(SOperatorInfo* pOperator); -void doSetOperatorCompleted(SOperatorInfo* pOperator) { +void setOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; ASSERT(pOperator->pTaskInfo != NULL); @@ -102,6 +102,16 @@ void doSetOperatorCompleted(SOperatorInfo* pOperator) { setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); } +void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, + void* pInfo, SExecTaskInfo* pTaskInfo) { + pOperator->name = (char*)name; + pOperator->operatorType = type; + pOperator->blocking = blocking; + pOperator->status = status; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; +} + int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) { OPTR_SET_OPENED(pOperator); pOperator->cost.openCost = 0; @@ -1795,7 +1805,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } @@ -1805,7 +1815,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); if (!hasRemainResults(&pAggInfo->groupResInfo)) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } @@ -2054,7 +2064,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream); if (pBlock == NULL) { if (pInfo->totalInputRows == 0) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } @@ -2131,7 +2141,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { while (true) { fillResult = doFillImpl(pOperator); if (fillResult == NULL) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } @@ -2361,13 +2371,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; pInfo->groupId = UINT64_MAX; - pOperator->name = "TableAggregate"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, NULL); @@ -2564,14 +2569,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* goto _error; } - pOperator->name = "FillOperator"; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; + setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = pInfo->numOfExpr; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index cfcdd0ee6490862de9c2713080d31baafd0a4fcf..cf8dd110b6cfb4455b8ffce2f013f9514bf7f5af 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -316,7 +316,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) { doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); if (!hasRemainResults(&pInfo->groupResInfo)) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } @@ -438,12 +438,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* } initResultRowInfo(&pInfo->binfo.resultRowInfo); - - pOperator->name = "GroupbyAggOperator"; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + setOperatorInfo(pOperator, "GroupbyAggOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, NULL); @@ -654,7 +649,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { // try next group data ++pInfo->groupIndex; if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); clearPartitionOperator(pInfo); return NULL; } @@ -821,14 +816,9 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition goto _error; } - pOperator->name = "PartitionOperator"; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; + setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, NULL); @@ -963,7 +953,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { pInfo->pInputDataBlock = NULL; SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } printDataBlock(pBlock, "stream partitionby recv"); @@ -1103,14 +1093,9 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols); - pOperator->name = "StreamPartitionOperator"; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION; + setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, NULL); diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index e7fe2bfc1201b487d188bc22c49b26c81e17360b..4e1daac6431ea1c9f6c16a2e619157dfea6010b9 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -73,14 +73,10 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pRes = pResBlock; - pOperator->name = "MergeJoinOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; + + setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 58cc08265352e5e3a2801bdf367d36e12b65978c..ce1d13775caab667517237b29947749b52464908 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -98,12 +98,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys } pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols); - pOperator->name = "ProjectOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo, NULL); @@ -153,7 +149,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) { pLimitInfo->numOfOutputGroups += 1; if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return PROJECT_RETRIEVE_DONE; } @@ -187,7 +183,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS // TODO: optimize it later when partition by + limit if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) || (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); } } @@ -252,7 +248,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows); - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { @@ -400,12 +396,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy pInfo->binfo.pRes = pResBlock; pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); - pOperator->name = "IndefinitOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - + setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); @@ -498,7 +489,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { // The downstream exec may change the value of the newgroup, so use a local variable instead. SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } @@ -627,7 +618,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) { pOperator->resultInfo.totalRows += pRes->info.rows; - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); if (pOperator->cost.openCost == 0) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 903843bcc4ef08d072161249fa3cba814649acd1..3da54509ce09ede25a4a8e3db440007bd22bdf1a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -820,7 +820,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } else { // scan table group by group sequentially if (pInfo->currentGroupId == -1) { if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } @@ -843,7 +843,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } @@ -865,7 +865,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return result; } - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } } @@ -947,13 +947,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->currentGroupId = -1; pInfo->assignBlockUid = pTableScanNode->assignBlockUid; - pOperator->name = "TableScanOperator"; // for debug purpose - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->pTaskInfo = pTaskInfo; pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024*128, -1, .5); if (pInfo->metaCache.pTableMetaEntryCache == NULL) { @@ -986,13 +981,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pInfo->dataReader = pReadHandle; // pInfo->prevGroupId = -1; - pOperator->name = "TableSeqScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - + setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL); return pOperator; } @@ -1148,13 +1137,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi goto _error; } - pOperator->name = "DataBlockDistScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - + setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL); return pOperator; @@ -2367,9 +2350,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT pInfo->vnode = pHandle->vnode; pInfo->sContext = pHandle->sContext; - pOperator->name = "RawScanOperator"; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, NULL); return pOperator; @@ -2555,13 +2536,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->partitionSup.needCalc = false; - pOperator->name = "StreamScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); - pOperator->pTaskInfo = pTaskInfo; __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, NULL); @@ -2899,7 +2875,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { } blockDataDestroy(dataBlock); pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; } @@ -2952,7 +2928,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { if (ret != 0) { metaCloseTbCursor(pInfo->pCur); pInfo->pCur = NULL; - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); } pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; @@ -3742,7 +3718,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { } if (i >= taosArrayGetSize(pIdx->uids)) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); } else { pIdx->lastIdx = i + 1; } @@ -3924,7 +3900,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { if (ret != 0) { metaCloseTbCursor(pInfo->pCur); pInfo->pCur = NULL; - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); } pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; @@ -3946,7 +3922,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; } else { if (pInfo->showRewrite == false) { @@ -4198,14 +4174,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan pInfo->readHandle = *(SReadHandle*)readHandle; } - pOperator->name = "SysTableScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); - pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, NULL); return pOperator; @@ -4282,7 +4252,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { count += 1; if (++pInfo->curPos >= size) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); } } @@ -4334,14 +4304,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi pInfo->readHandle = *pReadHandle; pInfo->curPos = 0; - pOperator->name = "TagScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; - - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - + setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); @@ -4712,7 +4675,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { pInfo->hasGroupId = true; if (tableListSize == 0) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } pInfo->tableStartIndex = 0; @@ -4731,7 +4694,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { } else { stopGroupTableMergeScan(pOperator); if (pInfo->tableEndIndex >= tableListSize - 1) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } pInfo->tableStartIndex = pInfo->tableEndIndex + 1; @@ -4852,13 +4815,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN int32_t rowSize = pInfo->pResBlock->info.rowSize; pInfo->bufPageSize = getProperSortPageSize(rowSize); - pOperator->name = "TableMergeScanOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 700612e0ab2f9d5c0d507dc257c6c9344937e1c3..fc53623d44607ad17503d43351340348ace6b3fb 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -53,11 +53,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); - pOperator->name = "SortOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + setOperatorInfo(pOperator, "SortOperator", QUERY_NODE_PHYSICAL_PLAN_SORT, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.numOfExprs = numOfCols; @@ -214,7 +210,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->matchInfo.pList, pInfo); if (pBlock == NULL) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } @@ -428,7 +424,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { pInfo->prefetchedSortInput = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]); if (pInfo->prefetchedSortInput == NULL) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId; @@ -453,7 +449,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { beginSortGroup(pOperator); } else if (pInfo->childOpStatus == CHILD_OP_FINISHED) { finishSortGroup(pOperator); - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } } @@ -509,14 +505,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort } pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); - - pOperator->name = "GroupSortOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - + setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo, getGroupSortExplainExecInfo); @@ -705,7 +694,7 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; } else { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); } return pBlock; @@ -774,13 +763,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size pInfo->bufPageSize = getProperSortPageSize(rowSize); pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. - pOperator->name = "MultiwayMerge"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - + setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL, destroyMultiwayMergeOperatorInfo, getMultiwayMergeExplainExecInfo); diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index a6ac28829c2b9018cfda0056b6c51f311f13596d..7c9d73ad137d168f749c2aba4d02067952da0e8c 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -1443,7 +1443,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { printDataBlock(pInfo->pRes, "stream fill"); return pInfo->pRes; } - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); resetStreamFillInfo(pInfo); return NULL; } @@ -1512,7 +1512,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { } if (pInfo->pRes->info.rows == 0) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); resetStreamFillInfo(pInfo); return NULL; } @@ -1690,13 +1690,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi } pInfo->srcRowIndex = 0; - - pOperator->name = "StreamFillOperator"; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, NULL); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6d8282ff5d5d3acc4046bae96918d790dd9529c1..ca250f304a82674406ebbb093ce6a8409a6554c1 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1221,7 +1221,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } @@ -1232,7 +1232,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } @@ -1269,7 +1269,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } @@ -1739,7 +1739,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY); - pOperator->pTaskInfo = pTaskInfo; pInfo->win = pTaskInfo->window; pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; @@ -1777,12 +1776,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh } initResultRowInfo(&pInfo->binfo.resultRowInfo); - - pOperator->name = "TimeIntervalAggOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, NULL); @@ -1890,7 +1884,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } @@ -1933,7 +1927,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } @@ -2281,7 +2275,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } if (pSliceInfo->current > pSliceInfo->win.ekey) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } @@ -2330,7 +2324,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } if (pSliceInfo->current > pSliceInfo->win.ekey) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } } @@ -2342,7 +2336,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); if (pSliceInfo->current > pSliceInfo->win.ekey) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } } @@ -2365,7 +2359,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } if (pSliceInfo->current > pSliceInfo->win.ekey) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } } @@ -2386,7 +2380,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } if (pSliceInfo->current > pSliceInfo->win.ekey) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } } else { @@ -2448,7 +2442,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } if (pSliceInfo->current > pSliceInfo->win.ekey) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } } @@ -2463,7 +2457,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } if (pSliceInfo->current > pSliceInfo->win.ekey) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } } @@ -2557,13 +2551,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pScanInfo->cond.twindows = pInfo->win; pScanInfo->cond.type = TIMEWINDOW_RANGE_EXTERNAL; - pOperator->name = "TimeSliceOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - + setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, NULL); @@ -2633,13 +2621,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); pInfo->tsSlotId = tsSlotId; - pOperator->name = "StateWindowOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pTaskInfo = pTaskInfo; - pOperator->info = pInfo; + setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, NULL); @@ -2711,12 +2694,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW goto _error; } - pOperator->name = "SessionWindowAggOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - + setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, NULL); pOperator->pTaskInfo = pTaskInfo; @@ -3134,7 +3112,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return pInfo->binfo.pRes; } - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); if (!IS_FINAL_OP(pInfo)) { clearFunctionContext(&pOperator->exprSupp); // semi interval operator clear disk buffer @@ -4024,7 +4002,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { return pBInfo->pRes; } - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } @@ -4124,7 +4102,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { return pBInfo->pRes; } - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } @@ -4191,13 +4169,11 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->pGroupIdTbNameMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); - pOperator->name = "StreamSessionWindowAggOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, - destroyStreamSessionAggOperatorInfo, NULL); + setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, + OP_NOT_OPENED, pInfo, pTaskInfo); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, NULL); + if (downstream) { initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType, pInfo->primaryTsIndex); @@ -4248,7 +4224,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { clearFunctionContext(&pOperator->exprSupp); // semi interval operator clear disk buffer clearStreamSessionOperator(pInfo); - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } } @@ -4321,7 +4297,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { clearFunctionContext(&pOperator->exprSupp); // semi interval operator clear disk buffer clearStreamSessionOperator(pInfo); - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } @@ -4332,20 +4308,21 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream if (pOperator == NULL) { goto _error; } + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { - pInfo->isFinal = true; - pOperator->name = "StreamSessionFinalAggOperator"; - } else { - pInfo->isFinal = false; + pInfo->isFinal = (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION); + char* name = (pInfo->isFinal)? "StreamSessionFinalAggOperator":"StreamSessionSemiAggOperator"; + + if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); blockDataEnsureCapacity(pInfo->pUpdateRes, 128); - pOperator->name = "StreamSessionSemiAggOperator"; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, destroyStreamSessionAggOperatorInfo, NULL); } + setOperatorInfo(pOperator, name, pPhyNode->type , false, OP_NOT_OPENED, pInfo, pTaskInfo); + pInfo->pGroupIdTbNameMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); @@ -4575,7 +4552,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { return pBInfo->pRes; } - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } @@ -4641,7 +4618,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { printDataBlock(pBInfo->pRes, "single state"); return pBInfo->pRes; } - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); return NULL; } @@ -4706,12 +4683,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pGroupIdTbNameMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); - pOperator->name = "StreamStateAggOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pTaskInfo = pTaskInfo; - pOperator->info = pInfo; + setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, NULL); initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType, @@ -4861,7 +4833,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { cleanupAfterGroupResultGen(pMiaInfo, pRes); } - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); break; } @@ -4986,13 +4958,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, initResultRowInfo(&iaInfo->binfo.resultRowInfo); blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity); - - pOperator->name = "TimeMergeAlignedIntervalAggOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->pTaskInfo = pTaskInfo; - pOperator->info = miaInfo; + setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL, false, OP_NOT_OPENED, miaInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, NULL); @@ -5239,7 +5205,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { } if (pRes->info.rows == 0) { - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); } size_t rows = pRes->info.rows; @@ -5298,14 +5264,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge } initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo); - - pOperator->name = "TimeMergeIntervalAggOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->pTaskInfo = pTaskInfo; - pOperator->info = pMergeIntervalInfo; - + setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false, OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, NULL); @@ -5351,7 +5310,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval, &pInfo->delKey); - doSetOperatorCompleted(pOperator); + setOperatorCompleted(pOperator); streamStateCommit(pTaskInfo->streamInfo.pState); return NULL; } @@ -5535,11 +5494,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pGroupIdTbNameMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); - pOperator->name = "StreamIntervalOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, NULL); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 55642fa1a1680bec211992493f78b7fde944e0ee..079e553b07472cda2142f0d13673dab782ddfbc6 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3112,6 +3112,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } #else if (!pInputCol->hasNull) { + numOfElems = 1; + int32_t round = pInput->numOfRows >> 2; int32_t reminder = pInput->numOfRows & 0x03; @@ -3143,7 +3145,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) { - numOfElems++; if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { char* data = colDataGetData(pInputCol, i); doSaveCurrentVal(pCtx, i, pts[i], type, data); @@ -3173,7 +3174,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { if (numOfElems == 0) { firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); } - SET_VAL(pResInfo, numOfElems, 1); + +// SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; }