diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index cbd2dc66f5f9dfa8e2c7219ab2a67e659ec3779f..78bc6947cd9cedbcd44800b1445756e92ca83aea 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -549,8 +549,6 @@ typedef struct SStateWindowOperatorInfo { typedef struct SSortedMergeOperatorInfo { SOptrBasicInfo binfo; - bool hasVarCol; - SArray* pSortInfo; int32_t numOfSources; SSortHandle *pSortHandle; @@ -582,6 +580,24 @@ typedef struct SSortOperatorInfo { uint64_t totalElapsed; // total elapsed time } SSortOperatorInfo; +typedef struct SJoinOperatorInfo { + SSDataBlock *pRes; + int32_t joinType; + + SSDataBlock *pLeft; + int32_t leftPos; + SColumnInfo leftCol; + + SSDataBlock *pRight; + int32_t rightPos; + SColumnInfo rightCol; + + SNode *pOnCondition; +// SJoinStatus *status; +// int32_t numOfUpstream; +// SRspResultInfo resultInfo; +} SJoinOperatorInfo; + int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); void operatorDummyCloseFn(void* param, int32_t numOfCols); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); @@ -628,6 +644,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); + #if 0 SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, @@ -635,9 +653,6 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput); - -SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, - int32_t numOfOutput); #endif void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 14acff320418e68602a20842333c4865f55bffb6..45674d31bccf019450d318bba25491dcee02d2f5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -6600,10 +6600,10 @@ bool validateExprColumnInfo(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision, const char* name) { SResSchema s = {0}; - s.scale = scale; - s.type = type; - s.bytes = bytes; - s.slotId = slotId; + s.scale = scale; + s.type = type; + s.bytes = bytes; + s.slotId = slotId; s.precision = precision; strncpy(s.name, name, tListLen(s.name)); @@ -6679,8 +6679,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; SDataType* pType = &pFuncNode->node.resType; - pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, - pType->precision, pFuncNode->node.aliasName); + pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pFuncNode->node.aliasName); pExp->pExpr->_function.functionId = pFuncNode->funcId; pExp->pExpr->_function.pFunctNode = pFuncNode; @@ -6756,7 +6755,7 @@ static SArray* createIndexMap(SNodeList* pNodeList); static SArray* extractPartitionColInfo(SNodeList* pNodeList); SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, - uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { + uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { int32_t type = nodeType(pPhyNode); if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { @@ -6764,6 +6763,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t numOfCols = 0; tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); + if (pDataReader == NULL) { + return NULL; + } + SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc); @@ -6802,13 +6805,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } } + int32_t num = 0; int32_t type = nodeType(pPhyNode); - size_t size = LIST_LENGTH(pPhyNode->pChildren); - ASSERT(size == 1); + size_t size = LIST_LENGTH(pPhyNode->pChildren); - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); - int32_t num = 0; + SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); + for(int32_t i = 0; i < size; ++i) { + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); + ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + } if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode; @@ -6817,7 +6822,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset}; SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset}; - return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo); + return createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) { SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); @@ -6831,9 +6836,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); } - return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL); + return createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL); } else { - return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); + return createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); } } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; @@ -6851,33 +6856,39 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo }; int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId; - return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); + return createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets); SArray* slotMap = createIndexMap(pSortPhyNode->pTargets); - return createSortOperatorInfo(op, pResBlock, info, slotMap, pTaskInfo); + return createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); + return createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode; SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num); - return createPartitionOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); - } else if (QUERY_NODE_STATE_WINDOW == type) { + return createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); + } else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == type) { SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode; SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - return createStatewindowOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo); + return createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) { + SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*) pPhyNode; + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + + SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num); + return createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); } else { ASSERT(0); } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { @@ -7374,4 +7385,135 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo *operatorInfo, SExplainExecInfo return TSDB_CODE_SUCCESS; } +static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup) { + SJoinOperatorInfo* pJoinInfo = pOperator->info; +// SOptrBasicInfo* pInfo = &pJoinInfo->binfo; + + SSDataBlock* pRes = pJoinInfo->pRes; + blockDataCleanup(pRes); + blockDataEnsureCapacity(pRes, 4096); + + int32_t nrows = 0; + + while (1) { + bool prevVal = *newgroup; + + if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { + SOperatorInfo* ds1 = pOperator->pDownstream[0]; + publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC); + pJoinInfo->pLeft = ds1->getNextFn(ds1, newgroup); + publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC); + + pJoinInfo->leftPos = 0; + if (pJoinInfo->pLeft == NULL) { + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + break; + } + } + + if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { + SOperatorInfo* ds2 = pOperator->pDownstream[1]; + publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC); + pJoinInfo->pRight = ds2->getNextFn(ds2, newgroup); + publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC); + + pJoinInfo->rightPos = 0; + if (pJoinInfo->pRight == NULL) { + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + break; + } + } + + SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId); + char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos); + + SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId); + char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos); + + // only the timestamp match support for ordinary table + ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); + if (*(int64_t*) pLeftVal == *(int64_t*) pRightVal) { + for(int32_t i = 0; i < pOperator->numOfOutput; ++i) { + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i); + + SExprInfo* pExprInfo = &pOperator->pExpr[i]; + + int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId; + int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId; + + SColumnInfoData* pSrc = NULL; + if (pJoinInfo->pLeft->info.blockId == blockId) { + pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId); + } else { + pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId); + } + + if (colDataIsNull_s(pSrc, pJoinInfo->leftPos)) { + colDataAppendNULL(pDst, nrows); + } else { + char* p = colDataGetData(pSrc, pJoinInfo->leftPos); + colDataAppend(pDst, nrows, p, false); + } + } + + pJoinInfo->leftPos += 1; + pJoinInfo->rightPos += 1; + + nrows += 1; + } else if (*(int64_t*) pLeftVal < *(int64_t*) pRightVal) { + pJoinInfo->leftPos += 1; + if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { + continue; + } + } else if (*(int64_t*) pLeftVal > *(int64_t*) pRightVal) { + pJoinInfo->rightPos += 1; + if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { + continue; + } + } + + // the pDataBlock are always the same one, no need to call this again + pRes->info.rows = nrows; + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + break; + } + } + + return (pRes->info.rows > 0) ? pRes : NULL; +} + +SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo) { + SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL || pInfo == NULL) { + goto _error; + } + + pOperator->resultInfo.capacity = 4096; + pOperator->resultInfo.threshold = 4096 * 0.75; + +// initResultRowInf +// o(&pInfo->binfo.resultRowInfo, 8); + pInfo->pRes = pResBlock; + + pOperator->name = "JoinOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN; + pOperator->blockingOptr = true; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->numOfOutput = numOfCols; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->getNextFn = doMergeJoin; + pOperator->closeFn = destroyBasicOperatorInfo; + + int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); + return pOperator; + + _error: + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + return NULL; +} \ No newline at end of file