/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "function.h" #include "os.h" #include "querynodes.h" #include "tdatablock.h" #include "tmsg.h" #include "executorimpl.h" #include "tcompare.h" #include "thash.h" #include "ttypes.h" static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); static void destroyMergeJoinOperator(void* param, int32_t numOfOutput); SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo) { SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pOperator == NULL || pInfo == NULL) { goto _error; } initResultSizeInfo(pOperator, 4096); pInfo->pRes = pResBlock; pOperator->name = "MergeJoinOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN; pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; if (nodeType(pOnCondition) == QUERY_NODE_OPERATOR) { SOperatorNode* pNode = (SOperatorNode*)pOnCondition; setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft); setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight); } else if (nodeType(pOnCondition) == QUERY_NODE_LOGIC_CONDITION) { ASSERT(0); } pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL); int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { goto _error; } return pOperator; _error: taosMemoryFree(pInfo); taosMemoryFree(pOperator); pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; return NULL; } void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { pColumn->slotId = pColumnNode->slotId; pColumn->type = pColumnNode->node.resType.type; pColumn->bytes = pColumnNode->node.resType.bytes; pColumn->precision = pColumnNode->node.resType.precision; pColumn->scale = pColumnNode->node.resType.scale; } void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; } SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { SJoinOperatorInfo* pJoinInfo = pOperator->info; SSDataBlock* pRes = pJoinInfo->pRes; blockDataCleanup(pRes); blockDataEnsureCapacity(pRes, 4096); int32_t nrows = 0; while (1) { // todo extract method if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { SOperatorInfo* ds1 = pOperator->pDownstream[0]; publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC); pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1); publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC); pJoinInfo->leftPos = 0; if (pJoinInfo->pLeft == NULL) { setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); break; } } if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { SOperatorInfo* ds2 = pOperator->pDownstream[1]; publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC); pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2); publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC); pJoinInfo->rightPos = 0; if (pJoinInfo->pRight == NULL) { setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); break; } } SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId); char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos); SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId); char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos); // only the timestamp match support for ordinary table ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) { for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i); SExprInfo* pExprInfo = &pOperator->pExpr[i]; int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId; int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId; int32_t rowIndex = -1; SColumnInfoData* pSrc = NULL; if (pJoinInfo->pLeft->info.blockId == blockId) { pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId); rowIndex = pJoinInfo->leftPos; } else { pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId); rowIndex = pJoinInfo->rightPos; } if (colDataIsNull_s(pSrc, rowIndex)) { colDataAppendNULL(pDst, nrows); } else { char* p = colDataGetData(pSrc, rowIndex); colDataAppend(pDst, nrows, p, false); } } pJoinInfo->leftPos += 1; pJoinInfo->rightPos += 1; nrows += 1; } else if (*(int64_t*)pLeftVal < *(int64_t*)pRightVal) { pJoinInfo->leftPos += 1; if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { continue; } } else if (*(int64_t*)pLeftVal > *(int64_t*)pRightVal) { pJoinInfo->rightPos += 1; if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { continue; } } // the pDataBlock are always the same one, no need to call this again pRes->info.rows = nrows; if (pRes->info.rows >= pOperator->resultInfo.threshold) { break; } } return (pRes->info.rows > 0) ? pRes : NULL; }