提交 ee9190e9 编写于 作者: X Xiaoyu Wang

fix: subplans under set operator use different group ids

上级 479218fb
......@@ -165,7 +165,8 @@ typedef struct SVnodeModifyLogicNode {
typedef struct SExchangeLogicNode {
SLogicNode node;
int32_t srcGroupId;
int32_t srcStartGroupId;
int32_t srcEndGroupId;
} SExchangeLogicNode;
typedef struct SMergeLogicNode {
......@@ -399,7 +400,10 @@ typedef struct SDownstreamSourceNode {
typedef struct SExchangePhysiNode {
SPhysiNode node;
int32_t srcGroupId; // group id of datasource suplans
// for set operators, there will be multiple execution groups under one exchange, and the ids of these execution
// groups are consecutive
int32_t srcStartGroupId;
int32_t srcEndGroupId;
bool singleChannel;
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhysiNode;
......
......@@ -764,9 +764,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode;
SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcGroupId, sizeof(pExchNode->srcGroupId));
SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcStartGroupId, sizeof(pExchNode->srcStartGroupId));
if (NULL == group) {
qError("exchange src group %d not in groupHash", pExchNode->srcGroupId);
qError("exchange src group %d not in groupHash", pExchNode->srcStartGroupId);
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
......@@ -801,7 +801,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
}
QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcGroupId, level + 1));
QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcStartGroupId, level + 1));
break;
}
case QUERY_NODE_PHYSICAL_PLAN_SORT: {
......
......@@ -414,7 +414,8 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi
static int32_t logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(srcGroupId);
COPY_SCALAR_FIELD(srcStartGroupId);
COPY_SCALAR_FIELD(srcEndGroupId);
return TSDB_CODE_SUCCESS;
}
......
......@@ -722,14 +722,18 @@ static int32_t jsonToLogicVnodeModifyNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkExchangeLogicPlanSrcGroupId = "SrcGroupId";
static const char* jkExchangeLogicPlanSrcStartGroupId = "SrcStartGroupId";
static const char* jkExchangeLogicPlanSrcEndGroupId = "SrcEndGroupId";
static int32_t logicExchangeNodeToJson(const void* pObj, SJson* pJson) {
const SExchangeLogicNode* pNode = (const SExchangeLogicNode*)pObj;
int32_t code = logicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcGroupId, pNode->srcGroupId);
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcStartGroupId, pNode->srcStartGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcEndGroupId, pNode->srcEndGroupId);
}
return code;
......@@ -740,7 +744,10 @@ static int32_t jsonToLogicExchangeNode(const SJson* pJson, void* pObj) {
int32_t code = jsonToLogicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcGroupId, &pNode->srcGroupId);
code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcStartGroupId, &pNode->srcStartGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcEndGroupId, &pNode->srcEndGroupId);
}
return code;
......@@ -1833,7 +1840,8 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkExchangePhysiPlanSrcGroupId = "SrcGroupId";
static const char* jkExchangePhysiPlanSrcStartGroupId = "SrcStartGroupId";
static const char* jkExchangePhysiPlanSrcEndGroupId = "SrcEndGroupId";
static const char* jkExchangePhysiPlanSrcEndPoints = "SrcEndPoints";
static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) {
......@@ -1841,7 +1849,10 @@ static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) {
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExchangePhysiPlanSrcGroupId, pNode->srcGroupId);
code = tjsonAddIntegerToObject(pJson, jkExchangePhysiPlanSrcStartGroupId, pNode->srcStartGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExchangePhysiPlanSrcEndGroupId, pNode->srcEndGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkExchangePhysiPlanSrcEndPoints, pNode->pSrcEndPoints);
......@@ -1855,7 +1866,10 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkExchangePhysiPlanSrcGroupId, &pNode->srcGroupId);
code = tjsonGetIntValue(pJson, jkExchangePhysiPlanSrcStartGroupId, &pNode->srcStartGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkExchangePhysiPlanSrcEndGroupId, &pNode->srcEndGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkExchangePhysiPlanSrcEndPoints, &pNode->pSrcEndPoints);
......
......@@ -2294,7 +2294,8 @@ static int32_t msgToPhysiAggNode(STlvDecoder* pDecoder, void* pObj) {
enum {
PHY_EXCHANGE_CODE_BASE_NODE = 1,
PHY_EXCHANGE_CODE_SRC_GROUP_ID,
PHY_EXCHANGE_CODE_SRC_START_GROUP_ID,
PHY_EXCHANGE_CODE_SRC_END_GROUP_ID,
PHY_EXCHANGE_CODE_SINGLE_CHANNEL,
PHY_EXCHANGE_CODE_SRC_ENDPOINTS
};
......@@ -2304,7 +2305,10 @@ static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
int32_t code = tlvEncodeObj(pEncoder, PHY_EXCHANGE_CODE_BASE_NODE, physiNodeToMsg, &pNode->node);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, PHY_EXCHANGE_CODE_SRC_GROUP_ID, pNode->srcGroupId);
code = tlvEncodeI32(pEncoder, PHY_EXCHANGE_CODE_SRC_START_GROUP_ID, pNode->srcStartGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, PHY_EXCHANGE_CODE_SRC_END_GROUP_ID, pNode->srcEndGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_EXCHANGE_CODE_SINGLE_CHANNEL, pNode->singleChannel);
......@@ -2326,8 +2330,11 @@ static int32_t msgToPhysiExchangeNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_EXCHANGE_CODE_BASE_NODE:
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node);
break;
case PHY_EXCHANGE_CODE_SRC_GROUP_ID:
code = tlvDecodeI32(pTlv, &pNode->srcGroupId);
case PHY_EXCHANGE_CODE_SRC_START_GROUP_ID:
code = tlvDecodeI32(pTlv, &pNode->srcStartGroupId);
break;
case PHY_EXCHANGE_CODE_SRC_END_GROUP_ID:
code = tlvDecodeI32(pTlv, &pNode->srcEndGroupId);
break;
case PHY_EXCHANGE_CODE_SINGLE_CHANNEL:
code = tlvDecodeBool(pTlv, &pNode->singleChannel);
......
......@@ -1046,7 +1046,8 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic
return TSDB_CODE_OUT_OF_MEMORY;
}
pExchange->srcGroupId = pExchangeLogicNode->srcGroupId;
pExchange->srcStartGroupId = pExchangeLogicNode->srcStartGroupId;
pExchange->srcEndGroupId = pExchangeLogicNode->srcEndGroupId;
*pPhyNode = (SPhysiNode*)pExchange;
return TSDB_CODE_SUCCESS;
......@@ -1425,7 +1426,8 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExchange->srcGroupId = pMerge->srcGroupId;
pExchange->srcStartGroupId = pMerge->srcGroupId;
pExchange->srcEndGroupId = pMerge->srcGroupId;
pExchange->singleChannel = true;
pExchange->node.pParent = (SPhysiNode*)pMerge;
pExchange->node.pOutputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pMerge->node.pOutputDataBlockDesc);
......
......@@ -84,7 +84,8 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExchange->srcGroupId = pCxt->groupId;
pExchange->srcStartGroupId = pCxt->groupId;
pExchange->srcEndGroupId = pCxt->groupId;
pExchange->node.precision = pChild->precision;
pExchange->node.pTargets = nodesCloneList(pChild->pTargets);
if (NULL == pExchange->node.pTargets) {
......@@ -112,7 +113,8 @@ static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubpla
static bool splIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) {
return ((SExchangeLogicNode*)pLogicNode)->srcGroupId == groupId;
return groupId >= ((SExchangeLogicNode*)pLogicNode)->srcStartGroupId &&
groupId <= ((SExchangeLogicNode*)pLogicNode)->srcEndGroupId;
}
if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pLogicNode)) {
......@@ -1184,6 +1186,7 @@ static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubpl
if (TSDB_CODE_SUCCESS != code) {
break;
}
++(pCxt->groupId);
}
if (TSDB_CODE_SUCCESS == code) {
nodesDestroyList(pSubplanChildren);
......@@ -1207,12 +1210,14 @@ static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan,
return false;
}
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SProjectLogicNode* pProject) {
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
SProjectLogicNode* pProject) {
SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExchange->srcGroupId = pCxt->groupId;
pExchange->srcStartGroupId = startGroupId;
pExchange->srcEndGroupId = pCxt->groupId - 1;
pExchange->node.precision = pProject->node.precision;
pExchange->node.pTargets = nodesCloneList(pProject->node.pTargets);
if (NULL == pExchange->node.pTargets) {
......@@ -1246,11 +1251,11 @@ static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return TSDB_CODE_SUCCESS;
}
int32_t startGroupId = pCxt->groupId;
int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject);
if (TSDB_CODE_SUCCESS == code) {
code = unAllSplCreateExchangeNode(pCxt, info.pSubplan, info.pProject);
code = unAllSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pProject);
}
++(pCxt->groupId);
pCxt->split = true;
return code;
}
......@@ -1260,12 +1265,14 @@ typedef struct SUnionDistinctSplitInfo {
SLogicSubplan* pSubplan;
} SUnionDistinctSplitInfo;
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) {
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
SAggLogicNode* pAgg) {
SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExchange->srcGroupId = pCxt->groupId;
pExchange->srcStartGroupId = startGroupId;
pExchange->srcEndGroupId = pCxt->groupId - 1;
pExchange->node.precision = pAgg->node.precision;
pExchange->node.pTargets = nodesCloneList(pAgg->pGroupKeys);
if (NULL == pExchange->node.pTargets) {
......@@ -1293,11 +1300,11 @@ static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan)
return TSDB_CODE_SUCCESS;
}
int32_t startGroupId = pCxt->groupId;
int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg);
if (TSDB_CODE_SUCCESS == code) {
code = unDistSplCreateExchangeNode(pCxt, info.pSubplan, info.pAgg);
code = unDistSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pAgg);
}
++(pCxt->groupId);
pCxt->split = true;
return code;
}
......@@ -1430,7 +1437,7 @@ static const SSplitRule splitRuleSet[] = {
{.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
{.pName = "UnionAllSplit", .splitFunc = unionAllSplit},
{.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit},
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit},
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}, // not used yet
{.pName = "InsertSelectSplit", .splitFunc = insertSelectSplit}
};
// clang-format on
......
......@@ -63,7 +63,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDownstreamSourceNode* pSource) {
if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pNode)) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode;
if (pExchange->srcGroupId == groupId) {
if (groupId >= pExchange->srcStartGroupId && groupId <= pExchange->srcEndGroupId) {
return nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, nodesCloneNode((SNode*)pSource));
}
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == nodeType(pNode)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册