未验证 提交 ac1673fc 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #11463 from taosdata/feature/3.0_wxy

feat(query): distributed splitting of child/normal table JOIN
......@@ -231,6 +231,7 @@ typedef enum EDealRes {
DEAL_RES_CONTINUE = 1,
DEAL_RES_IGNORE_CHILD,
DEAL_RES_ERROR,
DEAL_RES_END
} EDealRes;
typedef EDealRes (*FNodeWalker)(SNode* pNode, void* pContext);
......
......@@ -543,7 +543,7 @@ EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
char* dbName = nodesGetValueFromNode(node);
strncpy(pContext, varDataVal(dbName), varDataLen(dbName));
*((char*)pContext + varDataLen(dbName)) = 0;
return DEAL_RES_ERROR; // stop walk
return DEAL_RES_END; // stop walk
}
default:
break;
......
......@@ -46,7 +46,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
case QUERY_NODE_OPERATOR: {
SOperatorNode* pOpNode = (SOperatorNode*)pNode;
res = walkNode(pOpNode->pLeft, order, walker, pContext);
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkNode(pOpNode->pRight, order, walker, pContext);
}
break;
......@@ -63,10 +63,10 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
case QUERY_NODE_JOIN_TABLE: {
SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode;
res = walkNode(pJoinTableNode->pLeft, order, walker, pContext);
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkNode(pJoinTableNode->pRight, order, walker, pContext);
}
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkNode(pJoinTableNode->pOnCond, order, walker, pContext);
}
break;
......@@ -80,7 +80,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
case QUERY_NODE_STATE_WINDOW: {
SStateWindowNode* pState = (SStateWindowNode*)pNode;
res = walkNode(pState->pExpr, order, walker, pContext);
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkNode(pState->pCol, order, walker, pContext);
}
break;
......@@ -88,7 +88,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
case QUERY_NODE_SESSION_WINDOW: {
SSessionWindowNode* pSession = (SSessionWindowNode*)pNode;
res = walkNode(pSession->pCol, order, walker, pContext);
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkNode(pSession->pGap, order, walker, pContext);
}
break;
......@@ -96,16 +96,16 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
case QUERY_NODE_INTERVAL_WINDOW: {
SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pNode;
res = walkNode(pInterval->pInterval, order, walker, pContext);
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkNode(pInterval->pOffset, order, walker, pContext);
}
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkNode(pInterval->pSliding, order, walker, pContext);
}
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkNode(pInterval->pFill, order, walker, pContext);
}
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkNode(pInterval->pCol, order, walker, pContext);
}
break;
......@@ -126,7 +126,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
break;
}
if (DEAL_RES_ERROR != res && TRAVERSAL_POSTORDER == order) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res && TRAVERSAL_POSTORDER == order) {
res = walker(pNode, pContext);
}
......@@ -136,8 +136,9 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext) {
SNode* node;
FOREACH(node, pNodeList) {
if (DEAL_RES_ERROR == walkNode(node, order, walker, pContext)) {
return DEAL_RES_ERROR;
EDealRes res = walkNode(node, order, walker, pContext);
if (DEAL_RES_ERROR == res || DEAL_RES_END == res) {
return res;
}
}
return DEAL_RES_CONTINUE;
......@@ -185,7 +186,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
case QUERY_NODE_OPERATOR: {
SOperatorNode* pOpNode = (SOperatorNode*)pNode;
res = rewriteNode(&(pOpNode->pLeft), order, rewriter, pContext);
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteNode(&(pOpNode->pRight), order, rewriter, pContext);
}
break;
......@@ -202,10 +203,10 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
case QUERY_NODE_JOIN_TABLE: {
SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode;
res = rewriteNode(&(pJoinTableNode->pLeft), order, rewriter, pContext);
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteNode(&(pJoinTableNode->pRight), order, rewriter, pContext);
}
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteNode(&(pJoinTableNode->pOnCond), order, rewriter, pContext);
}
break;
......@@ -219,7 +220,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
case QUERY_NODE_STATE_WINDOW: {
SStateWindowNode* pState = (SStateWindowNode*)pNode;
res = rewriteNode(&pState->pExpr, order, rewriter, pContext);
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteNode(&pState->pCol, order, rewriter, pContext);
}
break;
......@@ -227,7 +228,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
case QUERY_NODE_SESSION_WINDOW: {
SSessionWindowNode* pSession = (SSessionWindowNode*)pNode;
res = rewriteNode(&pSession->pCol, order, rewriter, pContext);
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteNode(&pSession->pGap, order, rewriter, pContext);
}
break;
......@@ -235,16 +236,16 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
case QUERY_NODE_INTERVAL_WINDOW: {
SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pNode;
res = rewriteNode(&(pInterval->pInterval), order, rewriter, pContext);
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteNode(&(pInterval->pOffset), order, rewriter, pContext);
}
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteNode(&(pInterval->pSliding), order, rewriter, pContext);
}
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteNode(&(pInterval->pFill), order, rewriter, pContext);
}
if (DEAL_RES_ERROR != res) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteNode(&(pInterval->pCol), order, rewriter, pContext);
}
break;
......@@ -265,7 +266,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
break;
}
if (DEAL_RES_ERROR != res && TRAVERSAL_POSTORDER == order) {
if (DEAL_RES_ERROR != res && DEAL_RES_END != res && TRAVERSAL_POSTORDER == order) {
res = rewriter(pRawNode, pContext);
}
......@@ -275,8 +276,9 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
static EDealRes rewriteList(SNodeList* pNodeList, ETraversalOrder order, FNodeRewriter rewriter, void* pContext) {
SNode** pNode;
FOREACH_FOR_REWRITE(pNode, pNodeList) {
if (DEAL_RES_ERROR == rewriteNode(pNode, order, rewriter, pContext)) {
return DEAL_RES_ERROR;
EDealRes res = rewriteNode(pNode, order, rewriter, pContext);
if (DEAL_RES_ERROR == res || DEAL_RES_END == res) {
return res;
}
}
return DEAL_RES_CONTINUE;
......
......@@ -335,8 +335,26 @@ SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType typ
CHECK_OUT_OF_MEM(cond);
cond->condType = type;
cond->pParameterList = nodesMakeList();
nodesListAppend(cond->pParameterList, pParam1);
nodesListAppend(cond->pParameterList, pParam2);
if ((QUERY_NODE_LOGIC_CONDITION == nodeType(pParam1) && type != ((SLogicConditionNode*)pParam1)->condType) ||
(QUERY_NODE_LOGIC_CONDITION == nodeType(pParam2) && type != ((SLogicConditionNode*)pParam2)->condType)) {
nodesListAppend(cond->pParameterList, pParam1);
nodesListAppend(cond->pParameterList, pParam2);
} else {
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam1)) {
nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam1)->pParameterList);
((SLogicConditionNode*)pParam1)->pParameterList = NULL;
nodesDestroyNode(pParam1);
} else {
nodesListAppend(cond->pParameterList, pParam1);
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam2)) {
nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam2)->pParameterList);
((SLogicConditionNode*)pParam2)->pParameterList = NULL;
nodesDestroyNode(pParam2);
} else {
nodesListAppend(cond->pParameterList, pParam2);
}
}
return (SNode*)cond;
}
......
......@@ -146,6 +146,7 @@ public:
meta_[db][tbname].reset(new MockTableMeta());
meta_[db][tbname]->schema = table.release();
meta_[db][tbname]->schema->uid = id_++;
meta_[db][tbname]->schema->tableType = TSDB_CHILD_TABLE;
SVgroupInfo vgroup = {.vgId = vgid, .hashBegin = 0, .hashEnd = 0,};
addEpIntoEpSet(&vgroup.epSet, "dnode_1", 6030);
......@@ -197,11 +198,11 @@ public:
std::cout << "Table:" << table.first << std::endl;
std::cout << SH("Field") << SH("Type") << SH("DataType") << IH("Bytes") << std::endl;
std::cout << SL(3, 1) << std::endl;
int16_t numOfTags = schema->tableInfo.numOfTags;
int16_t numOfFields = numOfTags + schema->tableInfo.numOfColumns;
int16_t numOfColumns = schema->tableInfo.numOfColumns;
int16_t numOfFields = numOfColumns + schema->tableInfo.numOfTags;
for (int16_t i = 0; i < numOfFields; ++i) {
const SSchema* col = schema->schema + i;
std::cout << SF(std::string(col->name)) << SH(ftToString(i, numOfTags)) << SH(dtToString(col->type)) << IF(col->bytes) << std::endl;
std::cout << SF(std::string(col->name)) << SH(ftToString(i, numOfColumns)) << SH(dtToString(col->type)) << IF(col->bytes) << std::endl;
}
std::cout << std::endl;
}
......@@ -262,8 +263,8 @@ private:
return tDataTypes[type].name;
}
std::string ftToString(int16_t colid, int16_t numOfTags) const {
return (0 == colid ? "column" : (colid <= numOfTags ? "tag" : "column"));
std::string ftToString(int16_t colid, int16_t numOfColumns) const {
return (0 == colid ? "column" : (colid <= numOfColumns ? "tag" : "column"));
}
STableMeta* getTableSchemaMeta(const std::string& db, const std::string& tbname) const {
......
......@@ -694,7 +694,6 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
}
return code;
return TSDB_CODE_SUCCESS;
}
static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
......
......@@ -41,6 +41,21 @@ typedef struct SOsdInfo {
SNodeList* pDsoFuncs;
} SOsdInfo;
typedef struct SCpdIsMultiTableCondCxt {
SNodeList* pLeftCols;
SNodeList* pRightCols;
bool havaLeftCol;
bool haveRightCol;
} SCpdIsMultiTableCondCxt;
typedef enum ECondAction {
COND_ACTION_STAY = 1,
COND_ACTION_PUSH_JOIN,
COND_ACTION_PUSH_LEFT_CHILD,
COND_ACTION_PUSH_RIGHT_CHILD
// after supporting outer join, there are other possibilities
} ECondAction;
static bool osdMayBeOptimized(SLogicNode* pNode) {
if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_OSD)) {
return false;
......@@ -152,36 +167,229 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode*
return TSDB_CODE_SUCCESS;
}
static int32_t cpdPartitionCondition(SJoinLogicNode* pJoin, SNodeList** pMultiTableCond, SNodeList** pSingleTableCond) {
// todo
return TSDB_CODE_SUCCESS;
static bool belongThisTable(SNode* pCondCol, SNodeList* pTableCols) {
SNode* pTableCol = NULL;
FOREACH(pTableCol, pTableCols) {
if (nodesEqualNode(pCondCol, pTableCol)) {
return true;
}
}
return false;
}
static int32_t cpdPushJoinCondToOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNodeList* pMultiTableCond) {
// todo
return TSDB_CODE_SUCCESS;
static EDealRes cpdIsMultiTableCondImpl(SNode* pNode, void* pContext) {
SCpdIsMultiTableCondCxt* pCxt = pContext;
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
if (belongThisTable(pNode, pCxt->pLeftCols)) {
pCxt->havaLeftCol = true;
} else if (belongThisTable(pNode, pCxt->pRightCols)) {
pCxt->haveRightCol = true;
}
return pCxt->havaLeftCol && pCxt->haveRightCol ? DEAL_RES_END : DEAL_RES_CONTINUE;
}
return DEAL_RES_CONTINUE;
}
static int32_t cpdPushJoinCondToChildren(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNodeList* pSingleTableCond) {
// todo
static ECondAction cpdCondAction(EJoinType joinType, SNodeList* pLeftCols, SNodeList* pRightCols, SNode* pNode) {
SCpdIsMultiTableCondCxt cxt = { .pLeftCols = pLeftCols, .pRightCols = pRightCols, .havaLeftCol = false, .haveRightCol = false };
nodesWalkExpr(pNode, cpdIsMultiTableCondImpl, &cxt);
return (JOIN_TYPE_INNER != joinType ? COND_ACTION_STAY :
(cxt.havaLeftCol && cxt.haveRightCol ? COND_ACTION_PUSH_JOIN : (cxt.havaLeftCol ? COND_ACTION_PUSH_LEFT_CHILD : COND_ACTION_PUSH_RIGHT_CHILD)));
}
static int32_t cpdMakeCond(SNodeList** pConds, SNode** pCond) {
if (NULL == *pConds) {
return TSDB_CODE_SUCCESS;
}
if (1 == LIST_LENGTH(*pConds)) {
*pCond = nodesListGetNode(*pConds, 0);
nodesClearList(*pConds);
} else {
SLogicConditionNode* pLogicCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
if (NULL == pLogicCond) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pLogicCond->condType = LOGIC_COND_TYPE_AND;
pLogicCond->pParameterList = *pConds;
*pCond = (SNode*)pLogicCond;
}
*pConds = NULL;
return TSDB_CODE_SUCCESS;
}
static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (NULL != pJoin->node.pConditions) {
SNodeList* pMultiTableCond = NULL;
SNodeList* pSingleTableCond = NULL;
int32_t code = cpdPartitionCondition(pJoin, &pMultiTableCond, &pSingleTableCond);
if (TSDB_CODE_SUCCESS == code && NULL != pMultiTableCond) {
code = cpdPushJoinCondToOnCond(pCxt, pJoin, pMultiTableCond);
static int32_t cpdPartitionLogicCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, SNode** pRightChildCond) {
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pJoin->node.pConditions;
if (LOGIC_COND_TYPE_AND != pLogicCond->condType) {
return TSDB_CODE_SUCCESS;
}
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
int32_t code = TSDB_CODE_SUCCESS;
SNodeList* pOnConds = NULL;
SNodeList* pLeftChildConds = NULL;
SNodeList* pRightChildConds = NULL;
SNodeList* pRemainConds = NULL;
SNode* pCond = NULL;
FOREACH(pCond, pLogicCond->pParameterList) {
ECondAction condAction = cpdCondAction(pJoin->joinType, pLeftCols, pRightCols, pCond);
if (COND_ACTION_PUSH_JOIN == condAction) {
code = nodesListMakeAppend(&pOnConds, nodesCloneNode(pCond));
} else if (COND_ACTION_PUSH_LEFT_CHILD == condAction) {
code = nodesListMakeAppend(&pLeftChildConds, nodesCloneNode(pCond));
} else if (COND_ACTION_PUSH_RIGHT_CHILD == condAction) {
code = nodesListMakeAppend(&pRightChildConds, nodesCloneNode(pCond));
} else {
code = nodesListMakeAppend(&pRemainConds, nodesCloneNode(pCond));
}
if (TSDB_CODE_SUCCESS == code && NULL != pSingleTableCond) {
code = cpdPushJoinCondToChildren(pCxt, pJoin, pSingleTableCond);
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
SNode* pTempOnCond = NULL;
SNode* pTempLeftChildCond = NULL;
SNode* pTempRightChildCond = NULL;
SNode* pTempRemainCond = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = cpdMakeCond(&pOnConds, &pTempOnCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = cpdMakeCond(&pLeftChildConds, &pTempLeftChildCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = cpdMakeCond(&pRightChildConds, &pTempRightChildCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = cpdMakeCond(&pRemainConds, &pTempRemainCond);
}
if (TSDB_CODE_SUCCESS == code) {
*pOnCond = pTempOnCond;
*pLeftChildCond = pTempLeftChildCond;
*pRightChildCond = pTempRightChildCond;
nodesDestroyNode(pJoin->node.pConditions);
pJoin->node.pConditions = pTempRemainCond;
} else {
nodesDestroyList(pOnConds);
nodesDestroyList(pLeftChildConds);
nodesDestroyList(pRightChildConds);
nodesDestroyList(pRemainConds);
nodesDestroyNode(pTempOnCond);
nodesDestroyNode(pTempLeftChildCond);
nodesDestroyNode(pTempRightChildCond);
nodesDestroyNode(pTempRemainCond);
}
return code;
}
static int32_t cpdPartitionOpCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, SNode** pRightChildCond) {
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
ECondAction condAction = cpdCondAction(pJoin->joinType, pLeftCols, pRightCols, pJoin->node.pConditions);
if (COND_ACTION_STAY == condAction) {
return TSDB_CODE_SUCCESS;
} else if (COND_ACTION_PUSH_JOIN == condAction) {
*pOnCond = pJoin->node.pConditions;
} else if (COND_ACTION_PUSH_LEFT_CHILD == condAction) {
*pLeftChildCond = pJoin->node.pConditions;
} else if (COND_ACTION_PUSH_RIGHT_CHILD == condAction) {
*pRightChildCond = pJoin->node.pConditions;
}
pJoin->node.pConditions = NULL;
return TSDB_CODE_SUCCESS;
}
static int32_t cpdPartitionCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, SNode** pRightChildCond) {
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->node.pConditions)) {
return cpdPartitionLogicCond(pJoin, pOnCond, pLeftChildCond, pRightChildCond);
} else {
return cpdPartitionOpCond(pJoin, pOnCond, pLeftChildCond, pRightChildCond);
}
}
static int32_t cpdCondAppend(SOptimizeContext* pCxt, SNode** pCond, SNode** pAdditionalCond) {
if (NULL == *pCond) {
TSWAP(*pCond, *pAdditionalCond, SNode*);
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pCond)) {
code = nodesListAppend(((SLogicConditionNode*)*pCond)->pParameterList, *pAdditionalCond);
if (TSDB_CODE_SUCCESS == code) {
*pAdditionalCond = NULL;
}
} else {
SLogicConditionNode* pLogicCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
if (NULL == pLogicCond) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pLogicCond->condType = LOGIC_COND_TYPE_AND;
code = nodesListMakeAppend(&pLogicCond->pParameterList, *pAdditionalCond);
if (TSDB_CODE_SUCCESS == code) {
*pAdditionalCond = NULL;
code = nodesListMakeAppend(&pLogicCond->pParameterList, *pCond);
}
if (TSDB_CODE_SUCCESS == code) {
*pCond = (SNode*)pLogicCond;
} else {
nodesDestroyNode(pLogicCond);
}
}
return code;
}
static int32_t cpdPushCondToOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) {
return cpdCondAppend(pCxt, &pJoin->pOnConditions, pCond);
}
static int32_t cpdPushCondToScan(SOptimizeContext* pCxt, SScanLogicNode* pScan, SNode** pCond) {
return cpdCondAppend(pCxt, &pScan->node.pConditions, pCond);
}
static int32_t cpdPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) {
switch (nodeType(pChild)) {
case QUERY_NODE_LOGIC_PLAN_SCAN:
return cpdPushCondToScan(pCxt, (SScanLogicNode*)pChild, pCond);
default:
break;
}
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (NULL == pJoin->node.pConditions) {
return TSDB_CODE_SUCCESS;
}
SNode* pOnCond = NULL;
SNode* pLeftChildCond = NULL;
SNode* pRightChildCond = NULL;
int32_t code = cpdPartitionCond(pJoin, &pOnCond, &pLeftChildCond, &pRightChildCond);
if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) {
code = cpdPushCondToOnCond(pCxt, pJoin, &pOnCond);
}
if (TSDB_CODE_SUCCESS == code && NULL != pLeftChildCond) {
code = cpdPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), &pLeftChildCond);
}
if (TSDB_CODE_SUCCESS == code && NULL != pRightChildCond) {
code = cpdPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond);
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode(pOnCond);
nodesDestroyNode(pLeftChildCond);
nodesDestroyNode(pRightChildCond);
}
return code;
}
static int32_t cpdPushAggCondition(SOptimizeContext* pCxt, SAggLogicNode* pAgg) {
// todo
return TSDB_CODE_SUCCESS;
......
......@@ -18,6 +18,7 @@
#define SPLIT_FLAG_MASK(n) (1 << n)
#define SPLIT_FLAG_STS SPLIT_FLAG_MASK(0)
#define SPLIT_FLAG_CTJ SPLIT_FLAG_MASK(1)
#define SPLIT_FLAG_SET_MASK(val, mask) (val) |= (mask)
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
......@@ -39,43 +40,14 @@ typedef struct SStsInfo {
SLogicSubplan* pSubplan;
} SStsInfo;
static SLogicNode* stsMatchByNode(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) &&
NULL != ((SScanLogicNode*)pNode)->pVgroupList && ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups > 1) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pSplitNode = stsMatchByNode((SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
}
return NULL;
}
typedef struct SCtjInfo {
SScanLogicNode* pScan;
SLogicSubplan* pSubplan;
} SCtjInfo;
static void stsFindSplitNode(SLogicSubplan* pSubplan, SStsInfo* pInfo) {
SLogicNode* pSplitNode = stsMatchByNode(pSubplan->pNode);
if (NULL != pSplitNode) {
pInfo->pScan = (SScanLogicNode*)pSplitNode;
pInfo->pSubplan = pSubplan;
}
}
static void stsMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, SStsInfo* pInfo) {
if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, SPLIT_FLAG_STS)) {
stsFindSplitNode(pSubplan, pInfo);
}
SNode* pChild;
FOREACH(pChild, pSubplan->pChildren) {
stsMatch(pCxt, (SLogicSubplan*)pChild, pInfo);
if (NULL != pInfo->pScan) {
break;
}
}
return;
}
typedef bool (*FSplFindSplitNode)(SLogicSubplan* pSubplan, SStsInfo* pInfo);
static SLogicSubplan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan) {
static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan, int32_t flag) {
SLogicSubplan* pSubplan = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
if (NULL == pSubplan) {
return NULL;
......@@ -84,11 +56,11 @@ static SLogicSubplan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode*
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
pSubplan->pNode = (SLogicNode*)nodesCloneNode(pScan);
TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList, SVgroupsInfo*);
SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, SPLIT_FLAG_STS);
SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, flag);
return pSubplan;
}
static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan) {
static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan, ESubplanType subplanType) {
SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -119,10 +91,100 @@ static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
return TSDB_CODE_FAILED;
}
static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) {
if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, flag)) {
if (func(pSubplan, pInfo)) {
return true;
}
}
SNode* pChild;
FOREACH(pChild, pSubplan->pChildren) {
if (splMatch(pCxt, (SLogicSubplan*)pChild, flag, func, pInfo)) {
return true;
}
}
return false;
}
static SLogicNode* stsMatchByNode(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) &&
NULL != ((SScanLogicNode*)pNode)->pVgroupList && ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups > 1) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pSplitNode = stsMatchByNode((SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
}
return NULL;
}
static bool stsFindSplitNode(SLogicSubplan* pSubplan, SStsInfo* pInfo) {
SLogicNode* pSplitNode = stsMatchByNode(pSubplan->pNode);
if (NULL != pSplitNode) {
pInfo->pScan = (SScanLogicNode*)pSplitNode;
pInfo->pSubplan = pSubplan;
}
return NULL != pSplitNode;
}
static int32_t stsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SStsInfo info = {0};
stsMatch(pCxt, pSubplan, &info);
if (NULL == info.pScan) {
if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STS, stsFindSplitNode, &info)) {
return TSDB_CODE_SUCCESS;
}
if (NULL == info.pSubplan->pChildren) {
info.pSubplan->pChildren = nodesMakeList();
if (NULL == info.pSubplan->pChildren) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
int32_t code = nodesListStrictAppend(info.pSubplan->pChildren, splCreateScanSubplan(pCxt, info.pScan, SPLIT_FLAG_STS));
if (TSDB_CODE_SUCCESS == code) {
code = splCreateExchangeNode(pCxt, info.pSubplan, info.pScan, SUBPLAN_TYPE_MERGE);
}
++(pCxt->groupId);
pCxt->split = true;
return code;
}
static bool ctjIsSingleTable(int8_t tableType) {
return (TSDB_CHILD_TABLE == tableType || TSDB_NORMAL_TABLE == tableType);
}
static SLogicNode* ctjMatchByNode(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode)) {
SLogicNode* pLeft = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
SLogicNode* pRight = (SLogicNode*)nodesListGetNode(pNode->pChildren, 1);
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pLeft) && ctjIsSingleTable(((SScanLogicNode*)pLeft)->pMeta->tableType) &&
QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pRight) && ctjIsSingleTable(((SScanLogicNode*)pRight)->pMeta->tableType)) {
return pRight;
}
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pSplitNode = ctjMatchByNode((SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
}
return NULL;
}
static bool ctjFindSplitNode(SLogicSubplan* pSubplan, SStsInfo* pInfo) {
SLogicNode* pSplitNode = ctjMatchByNode(pSubplan->pNode);
if (NULL != pSplitNode) {
pInfo->pScan = (SScanLogicNode*)pSplitNode;
pInfo->pSubplan = pSubplan;
}
return NULL != pSplitNode;
}
static int32_t ctjSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SCtjInfo info = {0};
if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_CTJ, ctjFindSplitNode, &info)) {
return TSDB_CODE_SUCCESS;
}
if (NULL == info.pSubplan->pChildren) {
......@@ -131,9 +193,9 @@ static int32_t stsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
int32_t code = nodesListStrictAppend(info.pSubplan->pChildren, stsCreateScanSubplan(pCxt, info.pScan));
int32_t code = nodesListStrictAppend(info.pSubplan->pChildren, splCreateScanSubplan(pCxt, info.pScan, SPLIT_FLAG_CTJ));
if (TSDB_CODE_SUCCESS == code) {
code = stsCreateExchangeNode(pCxt, info.pSubplan, info.pScan);
code = splCreateExchangeNode(pCxt, info.pSubplan, info.pScan, info.pSubplan->subplanType);
}
++(pCxt->groupId);
pCxt->split = true;
......@@ -141,7 +203,8 @@ static int32_t stsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
}
static const SSplitRule splitRuleSet[] = {
{ .pName = "SuperTableScan", .splitFunc = stsSplit }
{ .pName = "SuperTableScan", .splitFunc = stsSplit },
{ .pName = "ChildTableJoin", .splitFunc = ctjSplit },
};
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
......
......@@ -70,6 +70,12 @@ protected:
cout << "unformatted logic plan : " << endl;
cout << toString((const SNode*)pLogicNode, false) << endl;
code = optimizeLogicPlan(&cxt, pLogicNode);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] optimizeLogicPlan code:" << code << ", strerror:" << tstrerror(code) << endl;
return false;
}
SLogicSubplan* pLogicSubplan = nullptr;
code = splitLogicPlan(&cxt, pLogicNode, &pLogicSubplan);
if (code != TSDB_CODE_SUCCESS) {
......@@ -174,13 +180,13 @@ TEST_F(PlannerTest, selectStableBasic) {
TEST_F(PlannerTest, selectJoin) {
setDatabase("root", "test");
bind("SELECT * FROM st1s1 t1, st1s2 t2 where t1.ts = t2.ts");
ASSERT_TRUE(run());
// bind("SELECT t1.c1, t2.c2 FROM st1s1 t1, st1s2 t2 where t1.ts = t2.ts");
// ASSERT_TRUE(run());
bind("SELECT * FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts where t1.c1 > t2.c1");
ASSERT_TRUE(run());
// bind("SELECT t1.*, t2.* FROM st1s1 t1, st1s2 t2 where t1.ts = t2.ts");
// ASSERT_TRUE(run());
bind("SELECT t1.* FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts where t1.c1 > t2.c1");
bind("SELECT t1.c1, t2.c1 FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts where t1.c1 > t2.c1 and t1.c2 = 'abc' and t2.c2 = 'qwe'");
ASSERT_TRUE(run());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册