diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b5d972321bcad7da148330491bac51bbbb04c8d4..a91d204efa1f35bf3a523c653a5c073b81fff38c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4830,6 +4830,7 @@ static SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget); static SArray* createIndexMap(SNodeList* pNodeList); static SArray* extractPartitionColInfo(SNodeList* pNodeList); static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode); +static void setJoinColumnInfo(SColumnInfo* pInfo, const SColumnNode* pLeftNode); static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) { SInterval interval = { @@ -5624,25 +5625,29 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf goto _error; } - pOperator->resultInfo.capacity = 4096; - pOperator->resultInfo.threshold = 4096 * 0.75; - - // initResultRowInf - // o(&pInfo->binfo.resultRowInfo, 8); - pInfo->pRes = pResBlock; + initResultSizeInfo(pOperator, 4096); - pOperator->name = "JoinOperator"; + pInfo->pRes = pResBlock; + pOperator->name = "MergeJoinOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExprInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; - pOperator->info = pInfo; + pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; + SOperatorNode* pNode = (SOperatorNode*)pOnCondition; + setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft); + setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo, NULL, NULL, NULL); int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + return pOperator; _error: @@ -5651,3 +5656,11 @@ _error: pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + +void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { + pColumn->slotId = pColumnNode->slotId; + pColumn->type = pColumnNode->node.resType.type; + pColumn->bytes = pColumnNode->node.resType.bytes; + pColumn->precision = pColumnNode->node.resType.precision; + pColumn->scale = pColumnNode->node.resType.scale; +}