提交 2402fd1d 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 3f62f14c
...@@ -881,7 +881,9 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul ...@@ -881,7 +881,9 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
void doDestroyExchangeOperatorInfo(void* param); void doDestroyExchangeOperatorInfo(void* param);
void doSetOperatorCompleted(SOperatorInfo* pOperator); void setOperatorCompleted(SOperatorInfo* pOperator);
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, void* pInfo,
SExecTaskInfo* pTaskInfo);
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr,
SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo * pCache); SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo * pCache);
......
...@@ -93,12 +93,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -93,12 +93,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset); p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset);
} }
pOperator->name = "LastrowScanOperator"; setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet = pOperator->fpSet =
...@@ -126,7 +121,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -126,7 +121,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
uint64_t suid = tableListGetSuid(pTableList); uint64_t suid = tableListGetSuid(pTableList);
int32_t size = tableListGetSize(pTableList); int32_t size = tableListGetSize(pTableList);
if (size == 0) { if (size == 0) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -182,7 +177,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -182,7 +177,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pInfo->indexOfBufferedRes += 1; pInfo->indexOfBufferedRes += 1;
return pRes; return pRes;
} else { } else {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
} else { } else {
...@@ -234,7 +229,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -234,7 +229,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
} }
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
} }
......
...@@ -213,7 +213,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -213,7 +213,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
pExchangeInfo->limitInfo.numOfOutputRows += rows; pExchangeInfo->limitInfo.numOfOutputRows += rows;
if (rows == 0) { if (rows == 0) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} else { } else {
return pBlock; return pBlock;
...@@ -289,13 +289,8 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode ...@@ -289,13 +289,8 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
pInfo->seqLoadData = false; pInfo->seqLoadData = false;
pInfo->pTransporter = pTransporter; pInfo->pTransporter = pTransporter;
pOperator->name = "ExchangeOperator"; setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, destroyExchangeOperatorInfo, NULL); pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, destroyExchangeOperatorInfo, NULL);
return pOperator; return pOperator;
...@@ -506,7 +501,7 @@ void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) { ...@@ -506,7 +501,7 @@ void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize, GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
pLoadInfo->totalElapsed / 1000.0); pLoadInfo->totalElapsed / 1000.0);
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
......
...@@ -94,7 +94,7 @@ static void destroyIntervalOperatorInfo(void* param); ...@@ -94,7 +94,7 @@ static void destroyIntervalOperatorInfo(void* param);
static void destroyOperatorInfo(SOperatorInfo* pOperator); static void destroyOperatorInfo(SOperatorInfo* pOperator);
void doSetOperatorCompleted(SOperatorInfo* pOperator) { void setOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
ASSERT(pOperator->pTaskInfo != NULL); ASSERT(pOperator->pTaskInfo != NULL);
...@@ -102,6 +102,16 @@ void doSetOperatorCompleted(SOperatorInfo* pOperator) { ...@@ -102,6 +102,16 @@ void doSetOperatorCompleted(SOperatorInfo* pOperator) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
} }
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
void* pInfo, SExecTaskInfo* pTaskInfo) {
pOperator->name = (char*)name;
pOperator->operatorType = type;
pOperator->blocking = blocking;
pOperator->status = status;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
}
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) { int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
pOperator->cost.openCost = 0; pOperator->cost.openCost = 0;
...@@ -1795,7 +1805,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { ...@@ -1795,7 +1805,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator); pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) { if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -1805,7 +1815,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { ...@@ -1805,7 +1815,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
if (!hasRemainResults(&pAggInfo->groupResInfo)) { if (!hasRemainResults(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -2054,7 +2064,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { ...@@ -2054,7 +2064,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream); SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
if (pBlock == NULL) { if (pBlock == NULL) {
if (pInfo->totalInputRows == 0) { if (pInfo->totalInputRows == 0) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -2131,7 +2141,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { ...@@ -2131,7 +2141,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
while (true) { while (true) {
fillResult = doFillImpl(pOperator); fillResult = doFillImpl(pOperator);
if (fillResult == NULL) { if (fillResult == NULL) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -2361,13 +2371,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN ...@@ -2361,13 +2371,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
pInfo->groupId = UINT64_MAX; pInfo->groupId = UINT64_MAX;
pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, NULL); createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, NULL);
...@@ -2564,14 +2569,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -2564,14 +2569,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
goto _error; goto _error;
} }
pOperator->name = "FillOperator"; setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
pOperator->exprSupp.numOfExprs = pInfo->numOfExpr; pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
......
...@@ -316,7 +316,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) { ...@@ -316,7 +316,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
if (!hasRemainResults(&pInfo->groupResInfo)) { if (!hasRemainResults(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -438,12 +438,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* ...@@ -438,12 +438,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
} }
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
setOperatorInfo(pOperator, "GroupbyAggOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->name = "GroupbyAggOperator";
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, NULL);
...@@ -654,7 +649,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { ...@@ -654,7 +649,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
// try next group data // try next group data
++pInfo->groupIndex; ++pInfo->groupIndex;
if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) { if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
clearPartitionOperator(pInfo); clearPartitionOperator(pInfo);
return NULL; return NULL;
} }
...@@ -821,14 +816,9 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition ...@@ -821,14 +816,9 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
goto _error; goto _error;
} }
pOperator->name = "PartitionOperator"; setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, NULL);
...@@ -963,7 +953,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { ...@@ -963,7 +953,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
pInfo->pInputDataBlock = NULL; pInfo->pInputDataBlock = NULL;
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
printDataBlock(pBlock, "stream partitionby recv"); printDataBlock(pBlock, "stream partitionby recv");
...@@ -1103,14 +1093,9 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr ...@@ -1103,14 +1093,9 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols);
pOperator->name = "StreamPartitionOperator"; setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL,
destroyStreamPartitionOperatorInfo, NULL); destroyStreamPartitionOperatorInfo, NULL);
......
...@@ -73,14 +73,10 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t ...@@ -73,14 +73,10 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
pOperator->name = "MergeJoinOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode); extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode);
......
...@@ -98,12 +98,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -98,12 +98,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
} }
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
pOperator->name = "ProjectOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL,
destroyProjectOperatorInfo, NULL); destroyProjectOperatorInfo, NULL);
...@@ -153,7 +149,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S ...@@ -153,7 +149,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) { if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) {
pLimitInfo->numOfOutputGroups += 1; pLimitInfo->numOfOutputGroups += 1;
if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return PROJECT_RETRIEVE_DONE; return PROJECT_RETRIEVE_DONE;
} }
...@@ -187,7 +183,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS ...@@ -187,7 +183,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
// TODO: optimize it later when partition by + limit // TODO: optimize it later when partition by + limit
if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) || if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) ||
(pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }
} }
...@@ -252,7 +248,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -252,7 +248,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
} }
qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status, qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status,
pFinalRes->info.rows); pFinalRes->info.rows);
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
...@@ -400,12 +396,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -400,12 +396,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pOperator->name = "IndefinitOperator"; setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
...@@ -498,7 +489,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { ...@@ -498,7 +489,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
// The downstream exec may change the value of the newgroup, so use a local variable instead. // The downstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -627,7 +618,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) { ...@@ -627,7 +618,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += pRes->info.rows; pOperator->resultInfo.totalRows += pRes->info.rows;
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
if (pOperator->cost.openCost == 0) { if (pOperator->cost.openCost == 0) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
} }
......
...@@ -820,7 +820,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -820,7 +820,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
} else { // scan table group by group sequentially } else { // scan table group by group sequentially
if (pInfo->currentGroupId == -1) { if (pInfo->currentGroupId == -1) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) { if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -843,7 +843,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -843,7 +843,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
} }
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) { if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -865,7 +865,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -865,7 +865,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return result; return result;
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
} }
...@@ -947,13 +947,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -947,13 +947,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->currentGroupId = -1; pInfo->currentGroupId = -1;
pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
pOperator->name = "TableScanOperator"; // for debug purpose setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024*128, -1, .5); pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024*128, -1, .5);
if (pInfo->metaCache.pTableMetaEntryCache == NULL) { if (pInfo->metaCache.pTableMetaEntryCache == NULL) {
...@@ -986,13 +981,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* ...@@ -986,13 +981,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
pInfo->dataReader = pReadHandle; pInfo->dataReader = pReadHandle;
// pInfo->prevGroupId = -1; // pInfo->prevGroupId = -1;
pOperator->name = "TableSeqScanOperator"; setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL);
return pOperator; return pOperator;
} }
...@@ -1148,13 +1137,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi ...@@ -1148,13 +1137,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
goto _error; goto _error;
} }
pOperator->name = "DataBlockDistScanOperator"; setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL);
return pOperator; return pOperator;
...@@ -2367,9 +2350,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT ...@@ -2367,9 +2350,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
pInfo->vnode = pHandle->vnode; pInfo->vnode = pHandle->vnode;
pInfo->sContext = pHandle->sContext; pInfo->sContext = pHandle->sContext;
pOperator->name = "RawScanOperator"; setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, NULL); pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, NULL);
return pOperator; return pOperator;
...@@ -2555,13 +2536,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2555,13 +2536,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
pInfo->partitionSup.needCalc = false; pInfo->partitionSup.needCalc = false;
pOperator->name = "StreamScanOperator"; setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->pTaskInfo = pTaskInfo;
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, NULL);
...@@ -2899,7 +2875,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { ...@@ -2899,7 +2875,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
} }
blockDataDestroy(dataBlock); blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
} }
...@@ -2952,7 +2928,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { ...@@ -2952,7 +2928,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
if (ret != 0) { if (ret != 0) {
metaCloseTbCursor(pInfo->pCur); metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL; pInfo->pCur = NULL;
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
...@@ -3742,7 +3718,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { ...@@ -3742,7 +3718,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
} }
if (i >= taosArrayGetSize(pIdx->uids)) { if (i >= taosArrayGetSize(pIdx->uids)) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} else { } else {
pIdx->lastIdx = i + 1; pIdx->lastIdx = i + 1;
} }
...@@ -3924,7 +3900,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { ...@@ -3924,7 +3900,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
if (ret != 0) { if (ret != 0) {
metaCloseTbCursor(pInfo->pCur); metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL; pInfo->pCur = NULL;
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
...@@ -3946,7 +3922,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { ...@@ -3946,7 +3922,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
} else { } else {
if (pInfo->showRewrite == false) { if (pInfo->showRewrite == false) {
...@@ -4198,14 +4174,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan ...@@ -4198,14 +4174,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pInfo->readHandle = *(SReadHandle*)readHandle; pInfo->readHandle = *(SReadHandle*)readHandle;
} }
pOperator->name = "SysTableScanOperator"; setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, NULL);
return pOperator; return pOperator;
...@@ -4282,7 +4252,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -4282,7 +4252,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
count += 1; count += 1;
if (++pInfo->curPos >= size) { if (++pInfo->curPos >= size) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }
} }
...@@ -4334,14 +4304,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -4334,14 +4304,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
pInfo->readHandle = *pReadHandle; pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0; pInfo->curPos = 0;
pOperator->name = "TagScanOperator"; setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
...@@ -4712,7 +4675,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -4712,7 +4675,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
pInfo->hasGroupId = true; pInfo->hasGroupId = true;
if (tableListSize == 0) { if (tableListSize == 0) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
pInfo->tableStartIndex = 0; pInfo->tableStartIndex = 0;
...@@ -4731,7 +4694,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -4731,7 +4694,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
} else { } else {
stopGroupTableMergeScan(pOperator); stopGroupTableMergeScan(pOperator);
if (pInfo->tableEndIndex >= tableListSize - 1) { if (pInfo->tableEndIndex >= tableListSize - 1) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
pInfo->tableStartIndex = pInfo->tableEndIndex + 1; pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
...@@ -4852,13 +4815,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -4852,13 +4815,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
int32_t rowSize = pInfo->pResBlock->info.rowSize; int32_t rowSize = pInfo->pResBlock->info.rowSize;
pInfo->bufPageSize = getProperSortPageSize(rowSize); pInfo->bufPageSize = getProperSortPageSize(rowSize);
pOperator->name = "TableMergeScanOperator"; setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL,
destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo); destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo);
......
...@@ -53,11 +53,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* ...@@ -53,11 +53,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
pOperator->name = "SortOperator"; setOperatorInfo(pOperator, "SortOperator", QUERY_NODE_PHYSICAL_PLAN_SORT, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
...@@ -214,7 +210,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { ...@@ -214,7 +210,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
pInfo->matchInfo.pList, pInfo); pInfo->matchInfo.pList, pInfo);
if (pBlock == NULL) { if (pBlock == NULL) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -428,7 +424,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { ...@@ -428,7 +424,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) {
pInfo->prefetchedSortInput = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]); pInfo->prefetchedSortInput = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]);
if (pInfo->prefetchedSortInput == NULL) { if (pInfo->prefetchedSortInput == NULL) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId; pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId;
...@@ -453,7 +449,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { ...@@ -453,7 +449,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) {
beginSortGroup(pOperator); beginSortGroup(pOperator);
} else if (pInfo->childOpStatus == CHILD_OP_FINISHED) { } else if (pInfo->childOpStatus == CHILD_OP_FINISHED) {
finishSortGroup(pOperator); finishSortGroup(pOperator);
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
} }
...@@ -509,14 +505,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort ...@@ -509,14 +505,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
} }
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->name = "GroupSortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo,
getGroupSortExplainExecInfo); getGroupSortExplainExecInfo);
...@@ -705,7 +694,7 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { ...@@ -705,7 +694,7 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
if (pBlock != NULL) { if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
} else { } else {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }
return pBlock; return pBlock;
...@@ -774,13 +763,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size ...@@ -774,13 +763,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
pInfo->bufPageSize = getProperSortPageSize(rowSize); pInfo->bufPageSize = getProperSortPageSize(rowSize);
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result.
pOperator->name = "MultiwayMerge"; setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL, pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL,
destroyMultiwayMergeOperatorInfo, getMultiwayMergeExplainExecInfo); destroyMultiwayMergeOperatorInfo, getMultiwayMergeExplainExecInfo);
......
...@@ -1443,7 +1443,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { ...@@ -1443,7 +1443,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
printDataBlock(pInfo->pRes, "stream fill"); printDataBlock(pInfo->pRes, "stream fill");
return pInfo->pRes; return pInfo->pRes;
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
resetStreamFillInfo(pInfo); resetStreamFillInfo(pInfo);
return NULL; return NULL;
} }
...@@ -1512,7 +1512,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { ...@@ -1512,7 +1512,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
} }
if (pInfo->pRes->info.rows == 0) { if (pInfo->pRes->info.rows == 0) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
resetStreamFillInfo(pInfo); resetStreamFillInfo(pInfo);
return NULL; return NULL;
} }
...@@ -1690,13 +1690,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi ...@@ -1690,13 +1690,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
} }
pInfo->srcRowIndex = 0; pInfo->srcRowIndex = 0;
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->name = "StreamFillOperator";
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, NULL);
......
...@@ -1221,7 +1221,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -1221,7 +1221,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
pTaskInfo->code = pOperator->fpSet._openFn(pOperator); pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) { if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -1232,7 +1232,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -1232,7 +1232,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
bool hasRemain = hasRemainResults(&pInfo->groupResInfo); bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -1269,7 +1269,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { ...@@ -1269,7 +1269,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
bool hasRemain = hasRemainResults(&pInfo->groupResInfo); bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -1739,7 +1739,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh ...@@ -1739,7 +1739,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY); ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY);
pOperator->pTaskInfo = pTaskInfo;
pInfo->win = pTaskInfo->window; pInfo->win = pTaskInfo->window;
pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
...@@ -1777,12 +1776,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh ...@@ -1777,12 +1776,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
} }
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->name = "TimeIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, NULL); createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, NULL);
...@@ -1890,7 +1884,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -1890,7 +1884,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
bool hasRemain = hasRemainResults(&pInfo->groupResInfo); bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -1933,7 +1927,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -1933,7 +1927,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
bool hasRemain = hasRemainResults(&pInfo->groupResInfo); bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -2281,7 +2275,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2281,7 +2275,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
if (pSliceInfo->current > pSliceInfo->win.ekey) { if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -2330,7 +2324,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2330,7 +2324,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
if (pSliceInfo->current > pSliceInfo->win.ekey) { if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
} }
...@@ -2342,7 +2336,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2342,7 +2336,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
pSliceInfo->current = pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) { if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
} }
...@@ -2365,7 +2359,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2365,7 +2359,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
if (pSliceInfo->current > pSliceInfo->win.ekey) { if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
} }
...@@ -2386,7 +2380,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2386,7 +2380,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
if (pSliceInfo->current > pSliceInfo->win.ekey) { if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
} else { } else {
...@@ -2448,7 +2442,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2448,7 +2442,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
if (pSliceInfo->current > pSliceInfo->win.ekey) { if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
} }
...@@ -2463,7 +2457,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2463,7 +2457,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
if (pSliceInfo->current > pSliceInfo->win.ekey) { if (pSliceInfo->current > pSliceInfo->win.ekey) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
} }
...@@ -2557,13 +2551,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode ...@@ -2557,13 +2551,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pScanInfo->cond.twindows = pInfo->win; pScanInfo->cond.twindows = pInfo->win;
pScanInfo->cond.type = TIMEWINDOW_RANGE_EXTERNAL; pScanInfo->cond.type = TIMEWINDOW_RANGE_EXTERNAL;
pOperator->name = "TimeSliceOperator"; setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, NULL);
...@@ -2633,13 +2621,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi ...@@ -2633,13 +2621,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->tsSlotId = tsSlotId; pInfo->tsSlotId = tsSlotId;
pOperator->name = "StateWindowOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pInfo;
setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, NULL); createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, NULL);
...@@ -2711,12 +2694,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW ...@@ -2711,12 +2694,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
goto _error; goto _error;
} }
pOperator->name = "SessionWindowAggOperator"; setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, NULL);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
...@@ -3134,7 +3112,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3134,7 +3112,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
if (!IS_FINAL_OP(pInfo)) { if (!IS_FINAL_OP(pInfo)) {
clearFunctionContext(&pOperator->exprSupp); clearFunctionContext(&pOperator->exprSupp);
// semi interval operator clear disk buffer // semi interval operator clear disk buffer
...@@ -4024,7 +4002,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -4024,7 +4002,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes; return pBInfo->pRes;
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -4124,7 +4102,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -4124,7 +4102,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes; return pBInfo->pRes;
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -4191,13 +4169,11 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -4191,13 +4169,11 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
pOperator->name = "StreamSessionWindowAggOperator"; setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->blocking = true; pOperator->fpSet =
pOperator->status = OP_NOT_OPENED; createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, NULL);
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL,
destroyStreamSessionAggOperatorInfo, NULL);
if (downstream) { if (downstream) {
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType, initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
pInfo->primaryTsIndex); pInfo->primaryTsIndex);
...@@ -4248,7 +4224,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { ...@@ -4248,7 +4224,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
clearFunctionContext(&pOperator->exprSupp); clearFunctionContext(&pOperator->exprSupp);
// semi interval operator clear disk buffer // semi interval operator clear disk buffer
clearStreamSessionOperator(pInfo); clearStreamSessionOperator(pInfo);
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
} }
...@@ -4321,7 +4297,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { ...@@ -4321,7 +4297,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
clearFunctionContext(&pOperator->exprSupp); clearFunctionContext(&pOperator->exprSupp);
// semi interval operator clear disk buffer // semi interval operator clear disk buffer
clearStreamSessionOperator(pInfo); clearStreamSessionOperator(pInfo);
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -4332,20 +4308,21 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream ...@@ -4332,20 +4308,21 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
if (pOperator == NULL) { if (pOperator == NULL) {
goto _error; goto _error;
} }
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pInfo->isFinal = (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION);
pInfo->isFinal = true; char* name = (pInfo->isFinal)? "StreamSessionFinalAggOperator":"StreamSessionSemiAggOperator";
pOperator->name = "StreamSessionFinalAggOperator";
} else { if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
pInfo->isFinal = false;
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
blockDataEnsureCapacity(pInfo->pUpdateRes, 128); blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
pOperator->name = "StreamSessionSemiAggOperator";
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL,
destroyStreamSessionAggOperatorInfo, NULL); destroyStreamSessionAggOperatorInfo, NULL);
} }
setOperatorInfo(pOperator, name, pPhyNode->type , false, OP_NOT_OPENED, pInfo, pTaskInfo);
pInfo->pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
...@@ -4575,7 +4552,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -4575,7 +4552,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes; return pBInfo->pRes;
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -4641,7 +4618,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -4641,7 +4618,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
printDataBlock(pBInfo->pRes, "single state"); printDataBlock(pBInfo->pRes, "single state");
return pBInfo->pRes; return pBInfo->pRes;
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -4706,12 +4683,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4706,12 +4683,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
pOperator->name = "StreamStateAggOperator"; setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, NULL);
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType, initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
...@@ -4861,7 +4833,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4861,7 +4833,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
cleanupAfterGroupResultGen(pMiaInfo, pRes); cleanupAfterGroupResultGen(pMiaInfo, pRes);
} }
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
break; break;
} }
...@@ -4986,13 +4958,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -4986,13 +4958,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
initResultRowInfo(&iaInfo->binfo.resultRowInfo); initResultRowInfo(&iaInfo->binfo.resultRowInfo);
blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL, false, OP_NOT_OPENED, miaInfo, pTaskInfo);
pOperator->name = "TimeMergeAlignedIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->pTaskInfo = pTaskInfo;
pOperator->info = miaInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, NULL);
...@@ -5239,7 +5205,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5239,7 +5205,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
} }
if (pRes->info.rows == 0) { if (pRes->info.rows == 0) {
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }
size_t rows = pRes->info.rows; size_t rows = pRes->info.rows;
...@@ -5298,14 +5264,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge ...@@ -5298,14 +5264,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
} }
initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo); initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false, OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
pOperator->name = "TimeMergeIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->pTaskInfo = pTaskInfo;
pOperator->info = pMergeIntervalInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, NULL);
...@@ -5351,7 +5310,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5351,7 +5310,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
} }
deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval, deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval,
&pInfo->delKey); &pInfo->delKey);
doSetOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
streamStateCommit(pTaskInfo->streamInfo.pState); streamStateCommit(pTaskInfo->streamInfo.pState);
return NULL; return NULL;
} }
...@@ -5535,11 +5494,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5535,11 +5494,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
pOperator->name = "StreamIntervalOperator"; setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL,
destroyStreamFinalIntervalOperatorInfo, NULL); destroyStreamFinalIntervalOperatorInfo, NULL);
......
...@@ -3112,6 +3112,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -3112,6 +3112,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
} }
#else #else
if (!pInputCol->hasNull) { if (!pInputCol->hasNull) {
numOfElems = 1;
int32_t round = pInput->numOfRows >> 2; int32_t round = pInput->numOfRows >> 2;
int32_t reminder = pInput->numOfRows & 0x03; int32_t reminder = pInput->numOfRows & 0x03;
...@@ -3143,7 +3145,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -3143,7 +3145,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
} }
for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) { for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
numOfElems++;
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
doSaveCurrentVal(pCtx, i, pts[i], type, data); doSaveCurrentVal(pCtx, i, pts[i], type, data);
...@@ -3173,7 +3174,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { ...@@ -3173,7 +3174,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
if (numOfElems == 0) { if (numOfElems == 0) {
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
} }
SET_VAL(pResInfo, numOfElems, 1);
// SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册