diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e35da0c45a8773cfe3f801575895cf26bd042dca..5cfe796a2992435179da65656b18abae9ebf7f92 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -711,7 +711,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); #if 0 diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 99a69df79992616e2379f5fc6d42f3132b39d159..1327ddb48ece4816e817aae39c42a7e4c35ef2d7 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1148,14 +1148,9 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp); if (pCtx->sfp.getEnv != NULL) { pCtx->sfp.getEnv(pExpr->pExpr->_function.pFunctNode, &env); - } else { // to work around the interbuffer size requirement. - ASSERT(fmIsUserDefinedFunc(pCtx->functionId)); -// env.calcMemSize = pFunct->resSchema.bytes; } } pCtx->resDataInfo.interBufSize = env.calcMemSize; - // todo remove it later. -// ASSERT(pCtx->resDataInfo.interBufSize > 0); } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR || pExpr->pExpr->nodeType == QUERY_NODE_VALUE) { // for simple column, the result buffer needs to hold at least one element. @@ -4379,6 +4374,10 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { doDestroyBasicInfo(pInfo, numOfOutput); } +void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { + SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*) param; +} + void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); @@ -4955,7 +4954,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num); - pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); + pOptr = createMergeJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { SFillPhysiNode* pFillNode = (SFillPhysiNode*)pPhyNode; SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); @@ -5527,7 +5526,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { return (pRes->info.rows > 0) ? pRes : NULL; } -SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, +SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo) { SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); @@ -5553,7 +5552,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo, NULL, NULL, NULL); + createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL); int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { goto _error;