提交 c490820b 编写于 作者: H Haojun Liao

feat(query): add the merge join operator for child table inner join.

上级 79596698
......@@ -549,8 +549,6 @@ typedef struct SStateWindowOperatorInfo {
typedef struct SSortedMergeOperatorInfo {
SOptrBasicInfo binfo;
bool hasVarCol;
SArray* pSortInfo;
int32_t numOfSources;
SSortHandle *pSortHandle;
......@@ -582,6 +580,24 @@ typedef struct SSortOperatorInfo {
uint64_t totalElapsed; // total elapsed time
} SSortOperatorInfo;
typedef struct SJoinOperatorInfo {
SSDataBlock *pRes;
int32_t joinType;
SSDataBlock *pLeft;
int32_t leftPos;
SColumnInfo leftCol;
SSDataBlock *pRight;
int32_t rightPos;
SColumnInfo rightCol;
SNode *pOnCondition;
// SJoinStatus *status;
// int32_t numOfUpstream;
// SRspResultInfo resultInfo;
} SJoinOperatorInfo;
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
void operatorDummyCloseFn(void* param, int32_t numOfCols);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
......@@ -628,6 +644,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
......@@ -635,9 +653,6 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput);
#endif
void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList);
......
......@@ -6679,8 +6679,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
SDataType* pType = &pFuncNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
pType->precision, pFuncNode->node.aliasName);
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pFuncNode->node.aliasName);
pExp->pExpr->_function.functionId = pFuncNode->funcId;
pExp->pExpr->_function.pFunctNode = pFuncNode;
......@@ -6764,6 +6763,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t numOfCols = 0;
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
if (pDataReader == NULL) {
return NULL;
}
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc);
......@@ -6802,13 +6805,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
}
int32_t num = 0;
int32_t type = nodeType(pPhyNode);
size_t size = LIST_LENGTH(pPhyNode->pChildren);
ASSERT(size == 1);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
int32_t num = 0;
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
for(int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
}
if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode;
......@@ -6817,7 +6822,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset};
SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset};
return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo);
return createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) {
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
......@@ -6831,9 +6836,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
}
return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL);
return createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL);
} else {
return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo);
return createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo);
}
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
......@@ -6851,33 +6856,39 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
};
int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId;
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo);
return createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets);
SArray* slotMap = createIndexMap(pSortPhyNode->pTargets);
return createSortOperatorInfo(op, pResBlock, info, slotMap, pTaskInfo);
return createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo);
return createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode;
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
return createPartitionOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL);
} else if (QUERY_NODE_STATE_WINDOW == type) {
return createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL);
} else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == type) {
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createStatewindowOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo);
return createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) {
SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*) pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num);
return createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo);
} else {
ASSERT(0);
} /*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
......@@ -7374,4 +7385,135 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo *operatorInfo, SExplainExecInfo
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup) {
SJoinOperatorInfo* pJoinInfo = pOperator->info;
// SOptrBasicInfo* pInfo = &pJoinInfo->binfo;
SSDataBlock* pRes = pJoinInfo->pRes;
blockDataCleanup(pRes);
blockDataEnsureCapacity(pRes, 4096);
int32_t nrows = 0;
while (1) {
bool prevVal = *newgroup;
if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
SOperatorInfo* ds1 = pOperator->pDownstream[0];
publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pJoinInfo->pLeft = ds1->getNextFn(ds1, newgroup);
publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC);
pJoinInfo->leftPos = 0;
if (pJoinInfo->pLeft == NULL) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
break;
}
}
if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
SOperatorInfo* ds2 = pOperator->pDownstream[1];
publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pJoinInfo->pRight = ds2->getNextFn(ds2, newgroup);
publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC);
pJoinInfo->rightPos = 0;
if (pJoinInfo->pRight == NULL) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
break;
}
}
SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);
SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId);
char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos);
// only the timestamp match support for ordinary table
ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
if (*(int64_t*) pLeftVal == *(int64_t*) pRightVal) {
for(int32_t i = 0; i < pOperator->numOfOutput; ++i) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);
SExprInfo* pExprInfo = &pOperator->pExpr[i];
int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = NULL;
if (pJoinInfo->pLeft->info.blockId == blockId) {
pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId);
} else {
pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId);
}
if (colDataIsNull_s(pSrc, pJoinInfo->leftPos)) {
colDataAppendNULL(pDst, nrows);
} else {
char* p = colDataGetData(pSrc, pJoinInfo->leftPos);
colDataAppend(pDst, nrows, p, false);
}
}
pJoinInfo->leftPos += 1;
pJoinInfo->rightPos += 1;
nrows += 1;
} else if (*(int64_t*) pLeftVal < *(int64_t*) pRightVal) {
pJoinInfo->leftPos += 1;
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
continue;
}
} else if (*(int64_t*) pLeftVal > *(int64_t*) pRightVal) {
pJoinInfo->rightPos += 1;
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
continue;
}
}
// the pDataBlock are always the same one, no need to call this again
pRes->info.rows = nrows;
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break;
}
}
return (pRes->info.rows > 0) ? pRes : NULL;
}
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo) {
SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL || pInfo == NULL) {
goto _error;
}
pOperator->resultInfo.capacity = 4096;
pOperator->resultInfo.threshold = 4096 * 0.75;
// initResultRowInf
// o(&pInfo->binfo.resultRowInfo, 8);
pInfo->pRes = pResBlock;
pOperator->name = "JoinOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doMergeJoin;
pOperator->closeFn = destroyBasicOperatorInfo;
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
return pOperator;
_error:
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册