提交 f77c72c2 编写于 作者: X Xiaoyu Wang

feat: order by distributed split

上级 19e2292e
...@@ -211,7 +211,7 @@ typedef enum ENodeType { ...@@ -211,7 +211,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_MERGE, QUERY_NODE_PHYSICAL_PLAN_MERGE,
QUERY_NODE_PHYSICAL_PLAN_SORT, QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_SORT_MERGE_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL,
......
...@@ -42,6 +42,7 @@ typedef struct SScanLogicNode { ...@@ -42,6 +42,7 @@ typedef struct SScanLogicNode {
SNodeList* pScanPseudoCols; SNodeList* pScanPseudoCols;
int8_t tableType; int8_t tableType;
uint64_t tableId; uint64_t tableId;
uint64_t stableId;
SVgroupsInfo* pVgroupList; SVgroupsInfo* pVgroupList;
EScanType scanType; EScanType scanType;
uint8_t scanSeq[2]; // first is scan count, and second is reverse scan count uint8_t scanSeq[2]; // first is scan count, and second is reverse scan count
...@@ -109,6 +110,7 @@ typedef struct SExchangeLogicNode { ...@@ -109,6 +110,7 @@ typedef struct SExchangeLogicNode {
typedef struct SMergeLogicNode { typedef struct SMergeLogicNode {
SLogicNode node; SLogicNode node;
SNodeList* pMergeKeys; SNodeList* pMergeKeys;
SNodeList* pInputs;
int32_t numOfChannels; int32_t numOfChannels;
int32_t srcGroupId; int32_t srcGroupId;
} SMergeLogicNode; } SMergeLogicNode;
...@@ -117,7 +119,7 @@ typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW ...@@ -117,7 +119,7 @@ typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW
typedef enum EIntervalAlgorithm { typedef enum EIntervalAlgorithm {
INTERVAL_ALGO_HASH = 1, INTERVAL_ALGO_HASH = 1,
INTERVAL_ALGO_SORT_MERGE, INTERVAL_ALGO_MERGE,
INTERVAL_ALGO_STREAM_FINAL, INTERVAL_ALGO_STREAM_FINAL,
INTERVAL_ALGO_STREAM_SEMI, INTERVAL_ALGO_STREAM_SEMI,
INTERVAL_ALGO_STREAM_SINGLE, INTERVAL_ALGO_STREAM_SINGLE,
...@@ -220,6 +222,7 @@ typedef struct SScanPhysiNode { ...@@ -220,6 +222,7 @@ typedef struct SScanPhysiNode {
SNodeList* pScanCols; SNodeList* pScanCols;
SNodeList* pScanPseudoCols; SNodeList* pScanPseudoCols;
uint64_t uid; // unique id of the table uint64_t uid; // unique id of the table
uint64_t suid;
int8_t tableType; int8_t tableType;
SName tableName; SName tableName;
} SScanPhysiNode; } SScanPhysiNode;
...@@ -296,6 +299,7 @@ typedef struct SExchangePhysiNode { ...@@ -296,6 +299,7 @@ typedef struct SExchangePhysiNode {
typedef struct SMergePhysiNode { typedef struct SMergePhysiNode {
SPhysiNode node; SPhysiNode node;
SNodeList* pMergeKeys; SNodeList* pMergeKeys;
SNodeList* pTargets;
int32_t numOfChannels; int32_t numOfChannels;
int32_t srcGroupId; int32_t srcGroupId;
} SMergePhysiNode; } SMergePhysiNode;
...@@ -319,7 +323,7 @@ typedef struct SIntervalPhysiNode { ...@@ -319,7 +323,7 @@ typedef struct SIntervalPhysiNode {
int8_t slidingUnit; int8_t slidingUnit;
} SIntervalPhysiNode; } SIntervalPhysiNode;
typedef SIntervalPhysiNode SSortMergeIntervalPhysiNode; typedef SIntervalPhysiNode SMergeIntervalPhysiNode;
typedef SIntervalPhysiNode SStreamIntervalPhysiNode; typedef SIntervalPhysiNode SStreamIntervalPhysiNode;
typedef SIntervalPhysiNode SStreamFinalIntervalPhysiNode; typedef SIntervalPhysiNode SStreamFinalIntervalPhysiNode;
typedef SIntervalPhysiNode SStreamSemiIntervalPhysiNode; typedef SIntervalPhysiNode SStreamSemiIntervalPhysiNode;
......
...@@ -91,7 +91,7 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId ...@@ -91,7 +91,7 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId
} }
SHashObj *pColHash = NULL; SHashObj *pColHash = NULL;
SNodeList *pNodeList; SNodeList *pNodeList = NULL;
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList); nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
SNode *pNode = NULL; SNode *pNode = NULL;
FOREACH(pNode, pNodeList) { FOREACH(pNode, pNodeList) {
......
...@@ -318,6 +318,7 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { ...@@ -318,6 +318,7 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
CLONE_NODE_LIST_FIELD(pScanPseudoCols); CLONE_NODE_LIST_FIELD(pScanPseudoCols);
COPY_SCALAR_FIELD(tableType); COPY_SCALAR_FIELD(tableType);
COPY_SCALAR_FIELD(tableId); COPY_SCALAR_FIELD(tableId);
COPY_SCALAR_FIELD(stableId);
CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone); CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone);
COPY_SCALAR_FIELD(scanType); COPY_SCALAR_FIELD(scanType);
COPY_OBJECT_FIELD(scanSeq[0], sizeof(uint8_t) * 2); COPY_OBJECT_FIELD(scanSeq[0], sizeof(uint8_t) * 2);
...@@ -387,6 +388,7 @@ static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNo ...@@ -387,6 +388,7 @@ static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNo
static SNode* logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst) { static SNode* logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pMergeKeys); CLONE_NODE_LIST_FIELD(pMergeKeys);
CLONE_NODE_LIST_FIELD(pInputs);
COPY_SCALAR_FIELD(numOfChannels); COPY_SCALAR_FIELD(numOfChannels);
COPY_SCALAR_FIELD(srcGroupId); COPY_SCALAR_FIELD(srcGroupId);
return (SNode*)pDst; return (SNode*)pDst;
......
...@@ -230,6 +230,8 @@ const char* nodesNodeName(ENodeType type) { ...@@ -230,6 +230,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiSort"; return "PhysiSort";
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
return "PhysiHashInterval"; return "PhysiHashInterval";
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL:
return "PhysiMergeInterval";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
return "PhysiStreamInterval"; return "PhysiStreamInterval";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
...@@ -1249,6 +1251,7 @@ static int32_t jsonToName(const SJson* pJson, void* pObj) { ...@@ -1249,6 +1251,7 @@ static int32_t jsonToName(const SJson* pJson, void* pObj) {
static const char* jkScanPhysiPlanScanCols = "ScanCols"; static const char* jkScanPhysiPlanScanCols = "ScanCols";
static const char* jkScanPhysiPlanScanPseudoCols = "ScanPseudoCols"; static const char* jkScanPhysiPlanScanPseudoCols = "ScanPseudoCols";
static const char* jkScanPhysiPlanTableId = "TableId"; static const char* jkScanPhysiPlanTableId = "TableId";
static const char* jkScanPhysiPlanSTableId = "STableId";
static const char* jkScanPhysiPlanTableType = "TableType"; static const char* jkScanPhysiPlanTableType = "TableType";
static const char* jkScanPhysiPlanTableName = "TableName"; static const char* jkScanPhysiPlanTableName = "TableName";
...@@ -1265,6 +1268,9 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1265,6 +1268,9 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableId, pNode->uid); code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableId, pNode->uid);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanSTableId, pNode->suid);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableType, pNode->tableType); code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableType, pNode->tableType);
} }
...@@ -1288,6 +1294,9 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) { ...@@ -1288,6 +1294,9 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUBigIntValue(pJson, jkScanPhysiPlanTableId, &pNode->uid); code = tjsonGetUBigIntValue(pJson, jkScanPhysiPlanTableId, &pNode->uid);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUBigIntValue(pJson, jkScanPhysiPlanSTableId, &pNode->suid);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkScanPhysiPlanTableType, &pNode->tableType); code = tjsonGetTinyIntValue(pJson, jkScanPhysiPlanTableType, &pNode->tableType);
} }
...@@ -1644,6 +1653,7 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) { ...@@ -1644,6 +1653,7 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
} }
static const char* jkMergePhysiPlanMergeKeys = "MergeKeys"; static const char* jkMergePhysiPlanMergeKeys = "MergeKeys";
static const char* jkMergePhysiPlanTargets = "Targets";
static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels"; static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels";
static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId"; static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId";
...@@ -1654,6 +1664,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1654,6 +1664,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkMergePhysiPlanMergeKeys, pNode->pMergeKeys); code = nodeListToJson(pJson, jkMergePhysiPlanMergeKeys, pNode->pMergeKeys);
} }
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkMergePhysiPlanTargets, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanNumOfChannels, pNode->numOfChannels); code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanNumOfChannels, pNode->numOfChannels);
} }
...@@ -1671,6 +1684,9 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) { ...@@ -1671,6 +1684,9 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkMergePhysiPlanMergeKeys, &pNode->pMergeKeys); code = jsonToNodeList(pJson, jkMergePhysiPlanMergeKeys, &pNode->pMergeKeys);
} }
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkMergePhysiPlanTargets, &pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkMergePhysiPlanNumOfChannels, &pNode->numOfChannels); code = tjsonGetIntValue(pJson, jkMergePhysiPlanNumOfChannels, &pNode->numOfChannels);
} }
...@@ -3802,6 +3818,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { ...@@ -3802,6 +3818,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_SORT: case QUERY_NODE_PHYSICAL_PLAN_SORT:
return physiSortNodeToJson(pObj, pJson); return physiSortNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
...@@ -3930,6 +3947,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { ...@@ -3930,6 +3947,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_SORT: case QUERY_NODE_PHYSICAL_PLAN_SORT:
return jsonToPhysiSortNode(pJson, pObj); return jsonToPhysiSortNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
......
...@@ -260,8 +260,8 @@ SNodeptr nodesMakeNode(ENodeType type) { ...@@ -260,8 +260,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SSortPhysiNode)); return makeNode(type, sizeof(SSortPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
return makeNode(type, sizeof(SIntervalPhysiNode)); return makeNode(type, sizeof(SIntervalPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_SORT_MERGE_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL:
return makeNode(type, sizeof(SSortMergeIntervalPhysiNode)); return makeNode(type, sizeof(SMergeIntervalPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
return makeNode(type, sizeof(SStreamIntervalPhysiNode)); return makeNode(type, sizeof(SStreamIntervalPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
......
...@@ -51,7 +51,7 @@ TEST_F(ParserInitialDTest, dropBnode) { ...@@ -51,7 +51,7 @@ TEST_F(ParserInitialDTest, dropBnode) {
} }
// DROP CONSUMER GROUP [ IF EXISTS ] cgroup_name ON topic_name // DROP CONSUMER GROUP [ IF EXISTS ] cgroup_name ON topic_name
TEST_F(ParserInitialDTest, dropCGroup) { TEST_F(ParserInitialDTest, dropConsumerGroup) {
useDb("root", "test"); useDb("root", "test");
SMDropCgroupReq expect = {0}; SMDropCgroupReq expect = {0};
......
...@@ -219,9 +219,9 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT ...@@ -219,9 +219,9 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
// TSWAP(pScan->pMeta, pRealTable->pMeta);
TSWAP(pScan->pVgroupList, pRealTable->pVgroupList); TSWAP(pScan->pVgroupList, pRealTable->pVgroupList);
pScan->tableId = pRealTable->pMeta->uid; pScan->tableId = pRealTable->pMeta->uid;
pScan->stableId = pRealTable->pMeta->suid;
pScan->tableType = pRealTable->pMeta->tableType; pScan->tableType = pRealTable->pMeta->tableType;
pScan->scanSeq[0] = hasRepeatScanFuncs ? 2 : 1; pScan->scanSeq[0] = hasRepeatScanFuncs ? 2 : 1;
pScan->scanSeq[1] = 0; pScan->scanSeq[1] = 0;
......
...@@ -425,6 +425,7 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pS ...@@ -425,6 +425,7 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pS
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pScanPhysiNode->uid = pScanLogicNode->tableId; pScanPhysiNode->uid = pScanLogicNode->tableId;
pScanPhysiNode->suid = pScanLogicNode->stableId;
pScanPhysiNode->tableType = pScanLogicNode->tableType; pScanPhysiNode->tableType = pScanLogicNode->tableType;
memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName)); memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
if (NULL != pScanLogicNode->pTagCond) { if (NULL != pScanLogicNode->pTagCond) {
...@@ -923,8 +924,8 @@ static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) { ...@@ -923,8 +924,8 @@ static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) {
switch (intervalAlgo) { switch (intervalAlgo) {
case INTERVAL_ALGO_HASH: case INTERVAL_ALGO_HASH:
return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL; return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
case INTERVAL_ALGO_SORT_MERGE: case INTERVAL_ALGO_MERGE:
return QUERY_NODE_PHYSICAL_PLAN_SORT_MERGE_INTERVAL; return QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL;
case INTERVAL_ALGO_STREAM_FINAL: case INTERVAL_ALGO_STREAM_FINAL:
return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL; return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
case INTERVAL_ALGO_STREAM_SEMI: case INTERVAL_ALGO_STREAM_SEMI:
...@@ -1155,6 +1156,8 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) { ...@@ -1155,6 +1156,8 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
nodesDestroyNode(pExchange); nodesDestroyNode(pExchange);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
SNode* pSlot = NULL;
FOREACH(pSlot, pExchange->node.pOutputDataBlockDesc->pSlots) { ((SSlotDescNode*)pSlot)->output = true; }
return nodesListMakeStrictAppend(&pMerge->node.pChildren, pExchange); return nodesListMakeStrictAppend(&pMerge->node.pChildren, pExchange);
} }
...@@ -1168,12 +1171,14 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM ...@@ -1168,12 +1171,14 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM
pMerge->numOfChannels = pMergeLogicNode->numOfChannels; pMerge->numOfChannels = pMergeLogicNode->numOfChannels;
pMerge->srcGroupId = pMergeLogicNode->srcGroupId; pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc);
for (int32_t i = 0; i < pMerge->numOfChannels; ++i) { if (TSDB_CODE_SUCCESS == code) {
code = createExchangePhysiNodeByMerge(pMerge); for (int32_t i = 0; i < pMerge->numOfChannels; ++i) {
if (TSDB_CODE_SUCCESS != code) { code = createExchangePhysiNodeByMerge(pMerge);
break; if (TSDB_CODE_SUCCESS != code) {
break;
}
} }
} }
...@@ -1182,6 +1187,11 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM ...@@ -1182,6 +1187,11 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM
&pMerge->pMergeKeys); &pMerge->pMergeKeys);
} }
if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->node.pTargets,
&pMerge->pTargets);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pMerge; *pPhyNode = (SPhysiNode*)pMerge;
} else { } else {
......
...@@ -80,30 +80,36 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE ...@@ -80,30 +80,36 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode, static int32_t splReplaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew) {
ESubplanType subplanType) { if (NULL == pOld->pParent) {
SExchangeLogicNode* pExchange = NULL; pSubplan->pNode = (SLogicNode*)pNew;
if (TSDB_CODE_SUCCESS != splCreateExchangeNode(pCxt, pSplitNode, &pExchange)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSubplan->subplanType = subplanType;
if (NULL == pSplitNode->pParent) {
pSubplan->pNode = (SLogicNode*)pExchange;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SNode* pNode; SNode* pNode;
FOREACH(pNode, pSplitNode->pParent->pChildren) { FOREACH(pNode, pOld->pParent->pChildren) {
if (nodesEqualNode(pNode, pSplitNode)) { if (nodesEqualNode(pNode, pOld)) {
REPLACE_NODE(pExchange); REPLACE_NODE(pNew);
pExchange->node.pParent = pSplitNode->pParent; pNew->pParent = pOld->pParent;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
nodesDestroyNode(pExchange); return TSDB_CODE_PLAN_INTERNAL_ERROR;
return TSDB_CODE_FAILED; }
static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
ESubplanType subplanType) {
SExchangeLogicNode* pExchange = NULL;
int32_t code = splCreateExchangeNode(pCxt, pSplitNode, &pExchange);
if (TSDB_CODE_SUCCESS == code) {
code = splReplaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange);
}
if (TSDB_CODE_SUCCESS == code) {
pSubplan->subplanType = subplanType;
} else {
nodesDestroyNode(pExchange);
}
return code;
} }
static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) { static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) {
...@@ -295,24 +301,34 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic ...@@ -295,24 +301,34 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
return code; return code;
} }
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SNodeList* pMergeKeys, static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
SLogicNode* pPartChild) { SNodeList* pMergeKeys, SLogicNode* pPartChild) {
SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE); SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
if (NULL == pMerge) { if (NULL == pMerge) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pMerge->numOfChannels = ((SScanLogicNode*)nodesListGetNode(pPartChild->pChildren, 0))->pVgroupList->numOfVgroups; pMerge->numOfChannels = ((SScanLogicNode*)nodesListGetNode(pPartChild->pChildren, 0))->pVgroupList->numOfVgroups;
pMerge->srcGroupId = pCxt->groupId; pMerge->srcGroupId = pCxt->groupId;
pMerge->node.pParent = pParent;
pMerge->node.precision = pPartChild->precision; pMerge->node.precision = pPartChild->precision;
pMerge->pMergeKeys = pMergeKeys; pMerge->pMergeKeys = pMergeKeys;
pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets);
if (NULL == pMerge->node.pTargets) { int32_t code = TSDB_CODE_SUCCESS;
pMerge->pInputs = nodesCloneList(pPartChild->pTargets);
pMerge->node.pTargets = nodesCloneList(pSplitNode->pTargets);
if (NULL == pMerge->node.pTargets || NULL == pMerge->pInputs) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
if (NULL == pSubplan) {
code = nodesListMakeAppend(&pSplitNode->pChildren, pMerge);
} else {
code = splReplaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge);
}
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode(pMerge); nodesDestroyNode(pMerge);
return TSDB_CODE_OUT_OF_MEMORY;
} }
return code;
return nodesListMakeAppend(&pParent->pChildren, pMerge);
} }
static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) { static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) {
...@@ -329,8 +345,15 @@ static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitIn ...@@ -329,8 +345,15 @@ static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitIn
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_HASH; ((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_HASH;
((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_SORT_MERGE; ((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_MERGE;
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow); SNodeList* pMergeKeys = NULL;
code = nodesListMakeStrictAppend(&pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk));
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow);
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pMergeKeys);
}
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
...@@ -424,37 +447,99 @@ static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) ...@@ -424,37 +447,99 @@ static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
return code; return code;
} }
static int32_t stbSplCreatePartSortNode(SSortLogicNode* pMergeSort, SLogicNode** pOutput) { static SNode* stbSplCreateColumnNode(SExprNode* pExpr) {
SNodeList* pSortKeys = pMergeSort->pSortKeys; SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
pMergeSort->pSortKeys = NULL; if (NULL == pCol) {
SNodeList* pTargets = pMergeSort->node.pTargets; return NULL;
pMergeSort->node.pTargets = NULL; }
SNodeList* pChildren = pMergeSort->node.pChildren; if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
pMergeSort->node.pChildren = NULL; strcpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias);
}
strcpy(pCol->colName, pExpr->aliasName);
strcpy(pCol->node.aliasName, pExpr->aliasName);
pCol->node.resType = pExpr->resType;
return (SNode*)pCol;
}
static SNode* stbSplCreateOrderByExpr(SOrderByExprNode* pSortKey, SNode* pCol) {
SOrderByExprNode* pOutput = nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
if (NULL == pOutput) {
return NULL;
}
pOutput->pExpr = nodesCloneNode(pCol);
if (NULL == pOutput->pExpr) {
nodesDestroyNode(pOutput);
return NULL;
}
pOutput->order = pSortKey->order;
pOutput->nullOrder = pSortKey->nullOrder;
return (SNode*)pOutput;
}
static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput) {
int32_t code = TSDB_CODE_SUCCESS;
SNodeList* pMergeKeys = NULL;
SNode* pNode = NULL;
FOREACH(pNode, pSortKeys) {
SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode;
SNode* pTarget = NULL;
bool found = false;
FOREACH(pTarget, pTargets) {
if (0 == strcmp(((SExprNode*)pSortKey->pExpr)->aliasName, ((SColumnNode*)pTarget)->colName)) {
code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pTarget));
if (TSDB_CODE_SUCCESS != code) {
break;
}
found = true;
}
}
if (TSDB_CODE_SUCCESS == code && !found) {
SNode* pCol = stbSplCreateColumnNode((SExprNode*)pSortKey->pExpr);
code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pCol));
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppend(pTargets, pCol);
} else {
nodesDestroyNode(pCol);
}
}
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
if (TSDB_CODE_SUCCESS == code) {
*pOutput = pMergeKeys;
} else {
nodesDestroyList(pMergeKeys);
}
return code;
}
static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOutputPartSort,
SNodeList** pOutputMergeKeys) {
SNodeList* pSortKeys = pSort->pSortKeys;
pSort->pSortKeys = NULL;
SNodeList* pChildren = pSort->node.pChildren;
pSort->node.pChildren = NULL;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SSortLogicNode* pPartSort = nodesCloneNode(pMergeSort); SSortLogicNode* pPartSort = nodesCloneNode(pSort);
if (NULL == pPartSort) { if (NULL == pPartSort) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} }
pMergeSort->node.pTargets = pTargets; SNodeList* pMergeKeys = NULL;
pPartSort->node.pChildren = pChildren;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pPartSort->node.pChildren = pChildren;
pPartSort->pSortKeys = pSortKeys; pPartSort->pSortKeys = pSortKeys;
code = createColumnByRewriteExps(pPartSort->pSortKeys, &pPartSort->node.pTargets); code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code) {
pMergeSort->pSortKeys = nodesCloneList(pPartSort->node.pTargets);
if (NULL == pMergeSort->pSortKeys) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
*pOutput = (SLogicNode*)pPartSort; *pOutputPartSort = (SLogicNode*)pPartSort;
*pOutputMergeKeys = pMergeKeys;
} else { } else {
nodesDestroyNode(pPartSort); nodesDestroyNode(pPartSort);
nodesDestroyList(pMergeKeys);
} }
return code; return code;
...@@ -462,17 +547,10 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pMergeSort, SLogicNode** ...@@ -462,17 +547,10 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pMergeSort, SLogicNode**
static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartSort = NULL; SLogicNode* pPartSort = NULL;
int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort); SNodeList* pMergeKeys = NULL;
int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SNodeList* pMergeKeys = nodesCloneList(((SSortLogicNode*)pInfo->pSplitNode)->pSortKeys); code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort);
if (NULL != pMergeKeys) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartSort);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pMergeKeys);
}
} else {
code = TSDB_CODE_OUT_OF_MEMORY;
}
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
......
...@@ -46,4 +46,7 @@ TEST_F(PlanOrderByTest, stable) { ...@@ -46,4 +46,7 @@ TEST_F(PlanOrderByTest, stable) {
// ORDER BY key is in the projection list // ORDER BY key is in the projection list
run("SELECT c1 FROM st1 ORDER BY c1"); run("SELECT c1 FROM st1 ORDER BY c1");
// ORDER BY key is not in the projection list
run("SELECT c2 FROM st1 ORDER BY c1");
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册